1mod computation_coalescing;
45mod download;
46mod file_creation;
47mod poll_all;
48mod remotely_fed_cursor;
49
50use std::future::Future;
51use std::io::{BufReader, Read, Seek, Write};
52use std::ops::Deref;
53use std::path::{Path, PathBuf};
54use std::pin::Pin;
55use std::sync::atomic::AtomicU64;
56use std::sync::Arc;
57use std::time::Duration;
58
59use futures_util::{future, AsyncReadExt};
60use tokio::io::AsyncWriteExt;
61use tokio::time::Instant;
62
63use computation_coalescing::ComputationCoalescer;
64use download::response_to_uncompressed_stream_with_progress;
65use file_creation::{create_file_cleanly, CleanFileCreationError};
66use poll_all::PollAllPreservingOrder;
67use remotely_fed_cursor::{RemotelyFedCursor, RemotelyFedCursorFeeder};
68
69#[derive(Clone, Debug, PartialEq, Eq)]
72pub enum NtSymbolPathEntry {
73 Cache(PathBuf),
76 Chain {
79 dll: String,
81 cache_paths: Vec<CachePath>,
86 urls: Vec<String>,
89 },
90 LocalOrShare(PathBuf),
93}
94
95#[derive(Clone, Debug, PartialEq, Eq)]
97pub enum CachePath {
98 DefaultDownstreamStore,
102
103 Path(PathBuf),
105}
106
107impl CachePath {
108 pub fn to_path<'a>(&'a self, default_downstream_store: &'a Path) -> &'a Path {
109 match self {
110 CachePath::DefaultDownstreamStore => default_downstream_store,
111 CachePath::Path(path) => path,
112 }
113 }
114}
115
116pub fn get_home_sym_dir() -> Option<PathBuf> {
121 let home_dir = dirs::home_dir()?;
122 Some(home_dir.join("sym"))
123}
124
125pub fn get_symbol_path_from_environment() -> Option<String> {
127 std::env::var("_NT_SYMBOL_PATH").ok()
128}
129
130pub fn parse_nt_symbol_path(symbol_path: &str) -> Vec<NtSymbolPathEntry> {
134 fn chain<'a>(dll_name: &str, parts: impl Iterator<Item = &'a str>) -> NtSymbolPathEntry {
135 let mut cache_paths = Vec::new();
136 let mut urls = Vec::new();
137 for part in parts {
138 if part.is_empty() {
139 cache_paths.push(CachePath::DefaultDownstreamStore);
140 } else if part.starts_with("http://") || part.starts_with("https://") {
141 urls.push(part.into());
142 } else {
143 cache_paths.push(CachePath::Path(part.into()));
144 }
145 }
146 NtSymbolPathEntry::Chain {
147 dll: dll_name.to_string(),
148 cache_paths,
149 urls,
150 }
151 }
152
153 symbol_path
154 .split(';')
155 .filter_map(|segment| {
156 let mut parts = segment.split('*');
157 let first = parts.next().unwrap();
158 match first.to_ascii_lowercase().as_str() {
159 "cache" => parts
160 .next()
161 .map(|path| NtSymbolPathEntry::Cache(path.into())),
162 "srv" => Some(chain("symsrv.dll", parts)),
163 "symsrv" => parts.next().map(|dll_name| chain(dll_name, parts)),
164 _ => Some(NtSymbolPathEntry::LocalOrShare(first.into())),
165 }
166 })
167 .collect()
168}
169
170#[derive(thiserror::Error, Debug, Clone)]
172#[non_exhaustive]
173pub enum Error {
174 #[error("IO error: {0}")]
176 IoError(String),
177
178 #[error("The file was not found in the SymsrvDownloader.")]
180 NotFound,
181
182 #[error("No default downstream store was specified, but it was needed.")]
184 NoDefaultDownstreamStore,
185
186 #[error("The requested path does not have a file extension.")]
188 NoExtension,
189
190 #[error("The requested path does not have a recognized file extension (exe/dll/pdb/dbg).")]
192 UnrecognizedExtension,
193
194 #[error("An internal error occurred: Couldn't join task")]
196 JoinError(String),
197
198 #[error("ReqwestError: {0}")]
200 ReqwestError(String),
201
202 #[error("Unexpected Content-Encoding header: {0}")]
204 UnexpectedContentEncoding(String),
205
206 #[error("Error while extracting a CAB archive: {0}")]
208 CabExtraction(String),
209}
210
211impl From<std::io::Error> for Error {
212 fn from(err: std::io::Error) -> Error {
213 Error::IoError(err.to_string())
214 }
215}
216
217impl From<CleanFileCreationError<Error>> for Error {
218 fn from(e: CleanFileCreationError<Error>) -> Error {
219 match e {
220 CleanFileCreationError::CallbackIndicatedError(e) => e,
221 e => Error::IoError(e.to_string()),
222 }
223 }
224}
225
226#[derive(thiserror::Error, Debug)]
228pub enum DownloadError {
229 #[error("Creating the client failed: {0}")]
231 ClientCreationFailed(String),
232
233 #[error("Opening the request failed: {0}")]
235 OpenFailed(Box<dyn std::error::Error + Send + Sync>),
236
237 #[error("The download timed out")]
239 Timeout,
240
241 #[error("The server returned status code {0}")]
243 StatusError(http::StatusCode),
244
245 #[error("The destination directory could not be created")]
247 CouldNotCreateDestinationDirectory,
248
249 #[error("The response used an unexpected Content-Encoding: {0}")]
251 UnexpectedContentEncoding(String),
252
253 #[error("Error during downloading: {0}")]
255 ErrorDuringDownloading(std::io::Error),
256
257 #[error("Error while writing the downloaded file: {0}")]
259 ErrorWhileWritingDownloadedFile(std::io::Error),
260
261 #[error("Redirect-related error")]
263 Redirect(Box<dyn std::error::Error + Send + Sync>),
264
265 #[error("Other error: {0}")]
267 Other(Box<dyn std::error::Error + Send + Sync>),
268}
269
270#[derive(thiserror::Error, Debug)]
272pub enum CabExtractionError {
273 #[error("Empty CAB archive")]
275 EmptyCab,
276
277 #[error("Could not open CAB file: {0}")]
279 CouldNotOpenCabFile(std::io::Error),
280
281 #[error("Error while parsing the CAB file: {0}")]
283 CabParsing(std::io::Error),
284
285 #[error("Error while reading the CAB file: {0}")]
287 CabReading(std::io::Error),
288
289 #[error("Error while writing the file: {0}")]
291 FileWriting(std::io::Error),
292
293 #[error("Redirect-related error")]
295 Redirect(Box<dyn std::error::Error + Send + Sync>),
296
297 #[error("Other error: {0}")]
299 Other(Box<dyn std::error::Error + Send + Sync>),
300}
301
302#[cfg(test)]
303#[test]
304fn test_download_error_is_sync() {
305 fn assert_sync<T: Sync>() {}
306 assert_sync::<DownloadError>();
307}
308
309impl From<reqwest::Error> for DownloadError {
310 fn from(e: reqwest::Error) -> Self {
311 if e.is_status() {
312 DownloadError::StatusError(e.status().unwrap())
313 } else if e.is_request() {
314 DownloadError::OpenFailed(e.into())
315 } else if e.is_redirect() {
316 DownloadError::Redirect(e.into())
317 } else if e.is_timeout() {
318 DownloadError::Timeout
319 } else {
320 DownloadError::Other(e.into())
321 }
322 }
323}
324
325pub trait SymsrvObserver: Send + Sync + 'static {
328 fn on_new_download_before_connect(&self, download_id: u64, url: &str);
336
337 fn on_download_started(&self, download_id: u64);
340
341 fn on_download_progress(&self, download_id: u64, bytes_so_far: u64, total_bytes: Option<u64>);
350
351 fn on_download_completed(
356 &self,
357 download_id: u64,
358 uncompressed_size_in_bytes: u64,
359 time_until_headers: Duration,
360 time_until_completed: Duration,
361 );
362
363 fn on_download_failed(&self, download_id: u64, reason: DownloadError);
372
373 fn on_download_canceled(&self, download_id: u64);
384
385 fn on_new_cab_extraction(&self, extraction_id: u64, dest_path: &Path);
387
388 fn on_cab_extraction_progress(&self, extraction_id: u64, bytes_so_far: u64, total_bytes: u64);
390
391 fn on_cab_extraction_completed(
393 &self,
394 extraction_id: u64,
395 uncompressed_size_in_bytes: u64,
396 time_until_completed: Duration,
397 );
398
399 fn on_cab_extraction_failed(&self, extraction_id: u64, reason: CabExtractionError);
401
402 fn on_cab_extraction_canceled(&self, extraction_id: u64);
404
405 fn on_file_created(&self, path: &Path, size_in_bytes: u64);
409
410 fn on_file_accessed(&self, path: &Path);
417
418 fn on_file_missed(&self, path: &Path);
423}
424
425static NEXT_DOWNLOAD_OR_EXTRACTION_ID: AtomicU64 = AtomicU64::new(0);
426
427pub struct SymsrvDownloader {
432 inner: Arc<SymsrvDownloaderInner>,
433 inflight_request_cache:
434 ComputationCoalescer<(String, String, bool), PinBoxDynFuture<Result<PathBuf, Error>>>,
435}
436
437type PinBoxDynFuture<T> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;
438
439struct SymsrvDownloaderInner {
440 symbol_path: Vec<NtSymbolPathEntry>,
441 default_downstream_store: Option<PathBuf>,
442 observer: Option<Arc<dyn SymsrvObserver>>,
443 reqwest_client: Result<reqwest::Client, reqwest::Error>,
444}
445
446#[cfg(test)]
447#[test]
448fn test_symsrv_downloader_error_is_send_and_sync() {
449 fn assert_send<T: Send>() {}
450 fn assert_sync<T: Sync>() {}
451 assert_send::<SymsrvDownloader>();
452 assert_sync::<SymsrvDownloader>();
453}
454
455impl SymsrvDownloader {
456 pub fn new(symbol_path: Vec<NtSymbolPathEntry>) -> Self {
476 Self {
477 inner: Arc::new(SymsrvDownloaderInner::new(symbol_path)),
478 inflight_request_cache: ComputationCoalescer::new(),
479 }
480 }
481
482 pub fn set_observer(&mut self, observer: Option<Arc<dyn SymsrvObserver>>) {
489 Arc::get_mut(&mut self.inner).unwrap().observer = observer;
490 }
491
492 pub fn set_default_downstream_store<P: Into<PathBuf>>(
508 &mut self,
509 default_downstream_store: Option<P>,
510 ) {
511 Arc::get_mut(&mut self.inner)
512 .unwrap()
513 .default_downstream_store = default_downstream_store.map(Into::into);
514 }
515
516 pub async fn get_file(&self, filename: &str, hash: &str) -> Result<PathBuf, Error> {
545 self.get_file_impl(filename, hash, true).await
546 }
547
548 pub async fn get_file_no_download(&self, filename: &str, hash: &str) -> Result<PathBuf, Error> {
551 self.get_file_impl(filename, hash, false).await
552 }
553
554 async fn get_file_impl(
555 &self,
556 filename: &str,
557 hash: &str,
558 allow_downloads: bool,
559 ) -> Result<PathBuf, Error> {
560 let inner = self.inner.clone();
561 let filename = filename.to_owned();
562 let hash = hash.to_owned();
563
564 self.inflight_request_cache
565 .subscribe_or_compute(
566 &(filename.clone(), hash.clone(), allow_downloads),
567 move || {
568 let f =
569 async move { inner.get_file_impl(&filename, &hash, allow_downloads).await };
570 Box::pin(f)
571 },
572 )
573 .await
574 }
575}
576
577impl SymsrvDownloaderInner {
578 pub fn new(symbol_path: Vec<NtSymbolPathEntry>) -> Self {
579 let builder = reqwest::Client::builder();
580
581 let builder = builder.http1_only();
583
584 let builder = builder.no_gzip().no_brotli().no_deflate();
589
590 let client = builder.build();
593
594 Self {
595 symbol_path,
596 default_downstream_store: None,
597 observer: None,
598 reqwest_client: client,
599 }
600 }
601
602 pub async fn get_file_impl(
603 &self,
604 filename: &str,
605 hash: &str,
606 allow_downloads: bool,
607 ) -> Result<PathBuf, Error> {
608 let path: PathBuf = [filename, hash, filename].iter().collect();
609 let rel_path_uncompressed = &path;
610 let rel_path_compressed = create_compressed_path(rel_path_uncompressed)?;
611
612 let mut persisted_cache_paths: Vec<CachePath> = Vec::new();
616
617 for entry in &self.symbol_path {
619 match entry {
620 NtSymbolPathEntry::Cache(cache_dir) => {
621 let cache_path = CachePath::Path(cache_dir.into());
622 if persisted_cache_paths.contains(&cache_path) {
623 continue;
624 }
625
626 if let Some(found_path) = self
629 .check_directory(
630 cache_dir,
631 &persisted_cache_paths,
632 rel_path_uncompressed,
633 &rel_path_compressed,
634 )
635 .await?
636 {
637 return Ok(found_path);
638 }
639
640 persisted_cache_paths.push(cache_path);
643 }
644 NtSymbolPathEntry::Chain {
645 cache_paths, urls, ..
646 } => {
647 let mut parent_cache_paths = persisted_cache_paths.clone();
650
651 for cache_path in cache_paths {
652 if parent_cache_paths.contains(cache_path) {
653 continue;
654 }
655 parent_cache_paths.push(cache_path.clone());
656
657 let (_, parent_cache_paths) = parent_cache_paths.split_last().unwrap();
660 if let Some(cache_dir) = self.resolve_cache_path(cache_path) {
661 if let Some(found_path) = self
662 .check_directory(
663 cache_dir,
664 parent_cache_paths,
665 rel_path_uncompressed,
666 &rel_path_compressed,
667 )
668 .await?
669 {
670 return Ok(found_path);
671 }
672 }
673 }
674
675 if !allow_downloads {
678 continue;
680 }
681
682 let (download_dest_cache, remaining_caches) = parent_cache_paths
684 .split_last()
685 .unwrap_or((&CachePath::DefaultDownstreamStore, &[]));
686 let download_dest_cache_dir = self
687 .resolve_cache_path(download_dest_cache)
688 .ok_or(Error::NoDefaultDownstreamStore)?;
689 let bottom_cache = parent_cache_paths
690 .first()
691 .unwrap_or(&CachePath::DefaultDownstreamStore);
692
693 let mut file_urls = Vec::with_capacity(urls.len() * 2);
696 for server_url in urls {
697 file_urls.push((
698 url_join(server_url, rel_path_uncompressed.components()),
699 false,
700 ));
701 file_urls
702 .push((url_join(server_url, rel_path_compressed.components()), true));
703 }
704
705 let response_futures: Vec<_> = file_urls
707 .into_iter()
708 .map(|(file_url, is_compressed)| async move {
709 (
710 self.prepare_download_of_file(&file_url).await,
711 is_compressed,
712 )
713 })
714 .map(Box::pin)
715 .collect();
716
717 let Some((notifier, response, is_compressed)) = async {
720 let mut response_futures = PollAllPreservingOrder::new(response_futures);
721 while let Some(next_response) = response_futures.next().await {
722 let (prepared_response, is_compressed) = next_response;
723 if let Some((notifier, response)) = prepared_response {
724 return Some((notifier, response, is_compressed));
726 };
727 }
728 None
729 }
730 .await
731 else {
732 continue;
735 };
736
737 let uncompressed_dest_path = if is_compressed {
741 let (rx, tx) = remotely_fed_cursor::create_cursor_channel();
742 let download_dest_path_future = self.download_file_to_cache(
743 notifier,
744 response,
745 &rel_path_compressed,
746 download_dest_cache_dir,
747 Some(tx),
748 );
749 let extraction_result_future = self.extract_to_file_in_cache(
750 CabDataSource::Cursor(rx),
751 rel_path_uncompressed,
752 bottom_cache,
753 );
754 let (download_dest_path, extraction_result) =
755 future::join(download_dest_path_future, extraction_result_future).await;
756 let Some(dest_path) = download_dest_path else {
757 continue;
758 };
759
760 if let Some((_remaining_bottom_cache, remaining_mid_level_caches)) =
762 remaining_caches.split_first()
763 {
764 self.copy_file_to_caches(
766 &rel_path_compressed,
767 &dest_path,
768 remaining_mid_level_caches,
769 )
770 .await;
771 }
772
773 extraction_result?
775 } else {
776 let dest_path = self
777 .download_file_to_cache(
778 notifier,
779 response,
780 rel_path_uncompressed,
781 download_dest_cache_dir,
782 None,
783 )
784 .await;
785 let Some(dest_path) = dest_path else { continue };
786
787 self.copy_file_to_caches(
789 rel_path_uncompressed,
790 &dest_path,
791 remaining_caches,
792 )
793 .await;
794 dest_path
795 };
796 return Ok(uncompressed_dest_path);
797 }
798 NtSymbolPathEntry::LocalOrShare(dir_path) => {
799 if persisted_cache_paths.contains(&CachePath::Path(dir_path.into())) {
800 continue;
801 }
802
803 if let Some(found_path) = self
806 .check_directory(
807 dir_path,
808 &persisted_cache_paths,
809 rel_path_uncompressed,
810 &rel_path_compressed,
811 )
812 .await?
813 {
814 return Ok(found_path);
815 };
816 }
817 }
818 }
819 Err(Error::NotFound)
820 }
821
822 async fn check_file_exists(&self, path: &Path) -> bool {
824 let file_exists = matches!(tokio::fs::metadata(path).await, Ok(meta) if meta.is_file());
825 if !file_exists {
826 if let Some(observer) = self.observer.as_deref() {
827 observer.on_file_missed(path);
828 }
829 }
830 file_exists
831 }
832
833 fn resolve_cache_path<'a>(&'a self, cache_path: &'a CachePath) -> Option<&'a Path> {
834 match cache_path {
835 CachePath::Path(path) => Some(path),
836 CachePath::DefaultDownstreamStore => self.default_downstream_store.as_deref(),
837 }
838 }
839
840 async fn check_directory(
846 &self,
847 dir: &Path,
848 parent_cache_paths: &[CachePath],
849 rel_path_uncompressed: &Path,
850 rel_path_compressed: &Path,
851 ) -> Result<Option<PathBuf>, Error> {
852 let full_candidate_path = dir.join(rel_path_uncompressed);
853 let full_candidate_path_compr = dir.join(rel_path_compressed);
854
855 let (abs_path, is_compressed) = if self.check_file_exists(&full_candidate_path).await {
856 (full_candidate_path, false)
857 } else if self.check_file_exists(&full_candidate_path_compr).await {
858 (full_candidate_path_compr, true)
859 } else {
860 return Ok(None);
861 };
862
863 if let Some(observer) = self.observer.as_deref() {
866 observer.on_file_accessed(&abs_path);
867 }
868
869 let uncompressed_path = if is_compressed {
870 if let Some((bottom_most_cache, mid_level_caches)) = parent_cache_paths.split_first() {
871 self.copy_file_to_caches(rel_path_compressed, &abs_path, mid_level_caches)
875 .await;
876 self.extract_to_file_in_cache(
877 CabDataSource::Filename(abs_path.clone()),
878 rel_path_uncompressed,
879 bottom_most_cache,
880 )
881 .await?
882 } else {
883 self.extract_to_file_in_cache(
885 CabDataSource::Filename(abs_path.clone()),
886 rel_path_uncompressed,
887 &CachePath::DefaultDownstreamStore,
888 )
889 .await?
890 }
891 } else {
892 abs_path
893 };
894
895 Ok(Some(uncompressed_path))
896 }
897
898 async fn copy_file_to_caches(&self, rel_path: &Path, abs_path: &Path, caches: &[CachePath]) {
901 for cache_path in caches {
902 if let Some(cache_dir) = self.resolve_cache_path(cache_path) {
903 if let Ok(dest_path) = self
904 .make_dest_path_and_ensure_parent_dirs(rel_path, cache_dir)
905 .await
906 {
907 if let Ok(copied_bytes) = tokio::fs::copy(&abs_path, &dest_path).await {
910 if let Some(observer) = self.observer.as_deref() {
911 observer.on_file_created(&dest_path, copied_bytes);
912 }
913 }
914 }
915 }
916 }
917 }
918
919 async fn make_dest_path_and_ensure_parent_dirs(
923 &self,
924 rel_path: &Path,
925 cache_path: &Path,
926 ) -> Result<PathBuf, std::io::Error> {
927 let dest_path = cache_path.join(rel_path);
928 if let Some(dir) = dest_path.parent() {
929 tokio::fs::create_dir_all(dir).await?;
930 }
931 Ok(dest_path)
932 }
933
934 async fn extract_to_file_in_cache(
937 &self,
938 cab_data_source: CabDataSource,
939 rel_path: &Path,
940 cache_path: &CachePath,
941 ) -> Result<PathBuf, Error> {
942 let cache_path = self
943 .resolve_cache_path(cache_path)
944 .ok_or(Error::NoDefaultDownstreamStore)?;
945 let dest_path = self
946 .make_dest_path_and_ensure_parent_dirs(rel_path, cache_path)
947 .await?;
948
949 let notifier = {
950 let observer = self.observer.clone();
951 let extraction_id =
952 NEXT_DOWNLOAD_OR_EXTRACTION_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
953 if let Some(observer) = observer.as_deref() {
954 observer.on_new_cab_extraction(extraction_id, &dest_path);
955 }
956 ExtractionStatusReporter::new(extraction_id, observer)
957 };
958 let extraction_id = notifier.extraction_id();
959
960 let observer = self.observer.clone();
961 let extracted_size_result = create_file_cleanly(
962 &dest_path,
963 |mut dest_file: std::fs::File| async {
964 tokio::task::spawn_blocking(move || match cab_data_source {
965 CabDataSource::Filename(compressed_input_path) => {
966 let file = std::fs::File::open(compressed_input_path)
967 .map_err(CabExtractionError::CouldNotOpenCabFile)?;
968 let buf_read = BufReader::new(file);
969 extract_cab_to_file(extraction_id, buf_read, &mut dest_file, observer)
970 }
971 CabDataSource::Cursor(cursor) => {
972 extract_cab_to_file(extraction_id, cursor, &mut dest_file, observer)
973 }
974 })
975 .await
976 .expect("task panicked")
977 },
978 || async {
979 let size = std::fs::metadata(&dest_path)
980 .map_err(|_| {
981 CabExtractionError::Other(
982 "Could not get size of existing extracted file".into(),
983 )
984 })?
985 .len();
986 Ok(size)
987 },
988 )
989 .await;
990
991 let extracted_size = match extracted_size_result {
992 Ok(size) => size,
993 Err(e) => {
994 let error = Error::CabExtraction(format!("{}", e));
995 match e {
996 CleanFileCreationError::CallbackIndicatedError(e) => {
997 notifier.extraction_failed(e);
998 }
999 _ => {
1000 notifier.extraction_failed(CabExtractionError::FileWriting(e.into()));
1001 }
1002 }
1003 return Err(error);
1004 }
1005 };
1006
1007 notifier.extraction_completed(extracted_size, Instant::now());
1008
1009 if let Some(observer) = self.observer.as_deref() {
1010 observer.on_file_created(&dest_path, extracted_size);
1011 }
1012 Ok(dest_path)
1013 }
1014
1015 async fn prepare_download_of_file(
1016 &self,
1017 url: &str,
1018 ) -> Option<(DownloadStatusReporter, reqwest::Response)> {
1019 let download_id =
1020 NEXT_DOWNLOAD_OR_EXTRACTION_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1021 if let Some(observer) = self.observer.as_deref() {
1022 observer.on_new_download_before_connect(download_id, url);
1023 }
1024
1025 let reporter = DownloadStatusReporter::new(download_id, self.observer.clone());
1026
1027 let reqwest_client = match self.reqwest_client.as_ref() {
1028 Ok(client) => client,
1029 Err(e) => {
1030 reporter.download_failed(DownloadError::ClientCreationFailed(e.to_string()));
1031 return None;
1032 }
1033 };
1034
1035 let request_builder = reqwest_client.get(url);
1036
1037 let request_builder = request_builder.header("Accept-Encoding", "gzip");
1041
1042 let response_result = request_builder.send().await;
1044
1045 let response_result = response_result.and_then(|response| response.error_for_status());
1047
1048 let response = match response_result {
1049 Ok(response) => response,
1050 Err(e) => {
1051 reporter.download_failed(DownloadError::from(e));
1053 return None;
1054 }
1055 };
1056
1057 Some((reporter, response))
1058 }
1059
1060 async fn download_file_to_cache(
1062 &self,
1063 reporter: DownloadStatusReporter,
1064 response: reqwest::Response,
1065 rel_path: &Path,
1066 cache_dir: &Path,
1067 mut chunk_consumer: Option<RemotelyFedCursorFeeder>,
1068 ) -> Option<PathBuf> {
1069 let ts_after_status = Instant::now();
1071 let download_id = reporter.download_id();
1072 if let Some(observer) = self.observer.as_deref() {
1073 observer.on_download_started(download_id);
1074 }
1075
1076 let dest_path = match self
1077 .make_dest_path_and_ensure_parent_dirs(rel_path, cache_dir)
1078 .await
1079 {
1080 Ok(dest_path) => dest_path,
1081 Err(_e) => {
1082 reporter.download_failed(DownloadError::CouldNotCreateDestinationDirectory);
1083 return None;
1084 }
1085 };
1086
1087 let observer = self.observer.clone();
1088 let mut stream = match response_to_uncompressed_stream_with_progress(
1089 response,
1090 move |bytes_so_far, total_bytes| {
1091 if let Some(observer) = observer.as_deref() {
1092 observer.on_download_progress(download_id, bytes_so_far, total_bytes)
1093 }
1094 },
1095 ) {
1096 Ok(stream) => stream,
1097 Err(download::Error::UnexpectedContentEncoding(encoding)) => {
1098 reporter.download_failed(DownloadError::UnexpectedContentEncoding(encoding));
1099 return None;
1100 }
1101 };
1102
1103 let download_result: Result<u64, CleanFileCreationError<std::io::Error>> =
1104 create_file_cleanly(
1105 &dest_path,
1106 |dest_file: std::fs::File| async move {
1107 let mut dest_file = tokio::fs::File::from_std(dest_file);
1108 let mut buf = vec![0u8; 2 * 1024 * 1024];
1109 let mut uncompressed_size_in_bytes = 0;
1110 loop {
1111 let count = stream.read(&mut buf).await?;
1112 if count == 0 {
1113 break;
1114 }
1115 uncompressed_size_in_bytes += count as u64;
1116 dest_file.write_all(&buf[..count]).await?;
1117
1118 if let Some(chunk_consumer) = &mut chunk_consumer {
1119 chunk_consumer.feed(&buf[..count]);
1120 }
1121 }
1122 if let Some(chunk_consumer) = &mut chunk_consumer {
1123 chunk_consumer.mark_complete();
1124 }
1125 dest_file.flush().await?;
1126 Ok(uncompressed_size_in_bytes)
1127 },
1128 || async {
1129 let size = std::fs::metadata(&dest_path)?.len();
1130 Ok(size)
1131 },
1132 )
1133 .await;
1134
1135 let uncompressed_size_in_bytes = match download_result {
1136 Ok(size) => size,
1137 Err(CleanFileCreationError::CallbackIndicatedError(e)) => {
1138 reporter.download_failed(DownloadError::ErrorDuringDownloading(e));
1139 return None;
1140 }
1141 Err(e) => {
1142 reporter.download_failed(DownloadError::ErrorWhileWritingDownloadedFile(e.into()));
1143 return None;
1144 }
1145 };
1146
1147 let ts_after_download = Instant::now();
1148 reporter.download_completed(
1149 uncompressed_size_in_bytes,
1150 ts_after_status,
1151 ts_after_download,
1152 );
1153
1154 if let Some(observer) = self.observer.as_deref() {
1155 observer.on_file_created(&dest_path, uncompressed_size_in_bytes);
1156 }
1157
1158 Some(dest_path)
1159 }
1160}
1161
1162enum CabDataSource {
1163 Filename(PathBuf),
1164 Cursor(RemotelyFedCursor),
1165}
1166
1167fn get_first_file_entry<R: Read + Seek>(cabinet: &mut cab::Cabinet<R>) -> Option<(String, u64)> {
1168 for folder in cabinet.folder_entries() {
1169 if let Some(file) = folder.file_entries().next() {
1170 return Some((file.name().to_owned(), file.uncompressed_size().into()));
1171 }
1172 }
1173 None
1174}
1175
1176fn extract_cab_to_file<R: Read + Seek>(
1177 extraction_id: u64,
1178 source_data: R,
1179 dest_file: &mut std::fs::File,
1180 observer: Option<Arc<dyn SymsrvObserver>>,
1181) -> Result<u64, CabExtractionError> {
1182 use CabExtractionError::*;
1183 let mut cabinet = cab::Cabinet::new(source_data).map_err(CabParsing)?;
1184 let (file_entry_name, file_extracted_size) =
1185 get_first_file_entry(&mut cabinet).ok_or(EmptyCab)?;
1186 let mut reader = cabinet.read_file(&file_entry_name).map_err(CabParsing)?;
1187
1188 let mut bytes_written = 0;
1189 loop {
1190 let mut buf = [0; 4096];
1191 let bytes_read = reader.read(&mut buf).map_err(CabReading)?;
1192 if bytes_read == 0 {
1193 break;
1194 }
1195 dest_file
1196 .write_all(&buf[..bytes_read])
1197 .map_err(FileWriting)?;
1198 bytes_written += bytes_read as u64;
1199
1200 if let Some(observer) = observer.as_deref() {
1201 observer.on_cab_extraction_progress(extraction_id, bytes_written, file_extracted_size);
1202 }
1203 }
1204
1205 Ok(bytes_written)
1206}
1207
1208fn url_join(base_url: &str, components: std::path::Components) -> String {
1211 format!(
1212 "{}/{}",
1213 base_url.trim_end_matches('/'),
1214 components
1215 .map(|c| c.as_os_str().to_string_lossy())
1216 .collect::<Vec<_>>()
1217 .join("/")
1218 )
1219}
1220
1221fn create_compressed_path(uncompressed_path: &Path) -> Result<PathBuf, Error> {
1225 let uncompressed_ext = match uncompressed_path.extension() {
1226 Some(ext) => match ext.to_string_lossy().deref() {
1227 "exe" => "ex_",
1228 "dll" => "dl_",
1229 "pdb" => "pd_",
1230 "dbg" => "db_",
1231 _ => return Err(Error::UnrecognizedExtension),
1232 },
1233 None => return Err(Error::NoExtension),
1234 };
1235
1236 let mut compressed_path = uncompressed_path.to_owned();
1237 compressed_path.set_extension(uncompressed_ext);
1238 Ok(compressed_path)
1239}
1240
1241struct DownloadStatusReporter {
1244 download_id: Option<u64>,
1246 observer: Option<Arc<dyn SymsrvObserver>>,
1247 ts_before_connect: Instant,
1248}
1249
1250impl DownloadStatusReporter {
1251 pub fn new(download_id: u64, observer: Option<Arc<dyn SymsrvObserver>>) -> Self {
1252 Self {
1253 download_id: Some(download_id),
1254 observer,
1255 ts_before_connect: Instant::now(),
1256 }
1257 }
1258
1259 pub fn download_id(&self) -> u64 {
1260 self.download_id.unwrap()
1261 }
1262
1263 pub fn download_failed(mut self, e: DownloadError) {
1264 if let (Some(download_id), Some(observer)) = (self.download_id, self.observer.as_deref()) {
1265 observer.on_download_failed(download_id, e);
1266 }
1267 self.download_id = None;
1268 }
1270
1271 pub fn download_completed(
1272 mut self,
1273 uncompressed_size_in_bytes: u64,
1274 ts_after_headers: Instant,
1275 ts_after_completed: Instant,
1276 ) {
1277 if let (Some(download_id), Some(observer)) = (self.download_id, self.observer.as_deref()) {
1278 let time_until_headers = ts_after_headers.duration_since(self.ts_before_connect);
1279 let time_until_completed = ts_after_completed.duration_since(self.ts_before_connect);
1280 observer.on_download_completed(
1281 download_id,
1282 uncompressed_size_in_bytes,
1283 time_until_headers,
1284 time_until_completed,
1285 );
1286 }
1287 self.download_id = None;
1288 }
1290}
1291
1292impl Drop for DownloadStatusReporter {
1293 fn drop(&mut self) {
1294 if let (Some(download_id), Some(observer)) = (self.download_id, self.observer.as_deref()) {
1295 observer.on_download_canceled(download_id);
1299 }
1300 }
1301}
1302
1303struct ExtractionStatusReporter {
1306 extraction_id: Option<u64>,
1308 observer: Option<Arc<dyn SymsrvObserver>>,
1309 ts_before_start: Instant,
1310}
1311
1312impl ExtractionStatusReporter {
1313 pub fn new(extraction_id: u64, observer: Option<Arc<dyn SymsrvObserver>>) -> Self {
1314 Self {
1315 extraction_id: Some(extraction_id),
1316 observer,
1317 ts_before_start: Instant::now(),
1318 }
1319 }
1320
1321 pub fn extraction_id(&self) -> u64 {
1322 self.extraction_id.unwrap()
1323 }
1324
1325 pub fn extraction_failed(mut self, e: CabExtractionError) {
1326 if let (Some(extraction_id), Some(observer)) =
1327 (self.extraction_id, self.observer.as_deref())
1328 {
1329 observer.on_cab_extraction_failed(extraction_id, e);
1330 }
1331 self.extraction_id = None;
1332 }
1334
1335 pub fn extraction_completed(
1336 mut self,
1337 uncompressed_size_in_bytes: u64,
1338 ts_after_completed: Instant,
1339 ) {
1340 if let (Some(extraction_id), Some(observer)) =
1341 (self.extraction_id, self.observer.as_deref())
1342 {
1343 let time_until_completed = ts_after_completed.duration_since(self.ts_before_start);
1344 observer.on_cab_extraction_completed(
1345 extraction_id,
1346 uncompressed_size_in_bytes,
1347 time_until_completed,
1348 );
1349 }
1350 self.extraction_id = None;
1351 }
1353}
1354
1355impl Drop for ExtractionStatusReporter {
1356 fn drop(&mut self) {
1357 if let (Some(extraction_id), Some(observer)) =
1358 (self.extraction_id, self.observer.as_deref())
1359 {
1360 observer.on_cab_extraction_canceled(extraction_id);
1364 }
1365 }
1366}