mod computation_coalescing;
mod download;
mod file_creation;
mod poll_all;
mod remotely_fed_cursor;
use std::future::Future;
use std::io::{BufReader, Read, Seek, Write};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use futures_util::{future, AsyncReadExt};
use tokio::io::AsyncWriteExt;
use tokio::time::Instant;
use computation_coalescing::ComputationCoalescer;
use download::response_to_uncompressed_stream_with_progress;
use file_creation::{create_file_cleanly, CleanFileCreationError};
use poll_all::PollAllPreservingOrder;
use remotely_fed_cursor::{RemotelyFedCursor, RemotelyFedCursorFeeder};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NtSymbolPathEntry {
Cache(PathBuf),
Chain {
dll: String,
cache_paths: Vec<CachePath>,
urls: Vec<String>,
},
LocalOrShare(PathBuf),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum CachePath {
DefaultDownstreamStore,
Path(PathBuf),
}
impl CachePath {
pub fn to_path<'a>(&'a self, default_downstream_store: &'a Path) -> &'a Path {
match self {
CachePath::DefaultDownstreamStore => default_downstream_store,
CachePath::Path(path) => path,
}
}
}
pub fn get_home_sym_dir() -> Option<PathBuf> {
let home_dir = dirs::home_dir()?;
Some(home_dir.join("sym"))
}
pub fn get_symbol_path_from_environment() -> Option<String> {
std::env::var("_NT_SYMBOL_PATH").ok()
}
pub fn parse_nt_symbol_path(symbol_path: &str) -> Vec<NtSymbolPathEntry> {
fn chain<'a>(dll_name: &str, parts: impl Iterator<Item = &'a str>) -> NtSymbolPathEntry {
let mut cache_paths = Vec::new();
let mut urls = Vec::new();
for part in parts {
if part.is_empty() {
cache_paths.push(CachePath::DefaultDownstreamStore);
} else if part.starts_with("http://") || part.starts_with("https://") {
urls.push(part.into());
} else {
cache_paths.push(CachePath::Path(part.into()));
}
}
NtSymbolPathEntry::Chain {
dll: dll_name.to_string(),
cache_paths,
urls,
}
}
symbol_path
.split(';')
.filter_map(|segment| {
let mut parts = segment.split('*');
let first = parts.next().unwrap();
match first.to_ascii_lowercase().as_str() {
"cache" => parts
.next()
.map(|path| NtSymbolPathEntry::Cache(path.into())),
"srv" => Some(chain("symsrv.dll", parts)),
"symsrv" => parts.next().map(|dll_name| chain(dll_name, parts)),
_ => Some(NtSymbolPathEntry::LocalOrShare(first.into())),
}
})
.collect()
}
#[derive(thiserror::Error, Debug, Clone)]
#[non_exhaustive]
pub enum Error {
#[error("IO error: {0}")]
IoError(String),
#[error("The file was not found in the SymsrvDownloader.")]
NotFound,
#[error("No default downstream store was specified, but it was needed.")]
NoDefaultDownstreamStore,
#[error("The requested path does not have a file extension.")]
NoExtension,
#[error("The requested path does not have a recognized file extension (exe/dll/pdb/dbg).")]
UnrecognizedExtension,
#[error("An internal error occurred: Couldn't join task")]
JoinError(String),
#[error("ReqwestError: {0}")]
ReqwestError(String),
#[error("Unexpected Content-Encoding header: {0}")]
UnexpectedContentEncoding(String),
#[error("Error while extracting a CAB archive: {0}")]
CabExtraction(String),
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Error {
Error::IoError(err.to_string())
}
}
impl From<CleanFileCreationError<Error>> for Error {
fn from(e: CleanFileCreationError<Error>) -> Error {
match e {
CleanFileCreationError::CallbackIndicatedError(e) => e,
e => Error::IoError(e.to_string()),
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum DownloadError {
#[error("Creating the client failed: {0}")]
ClientCreationFailed(String),
#[error("Opening the request failed: {0}")]
OpenFailed(Box<dyn std::error::Error + Send + Sync>),
#[error("The download timed out")]
Timeout,
#[error("The server returned status code {0}")]
StatusError(http::StatusCode),
#[error("The destination directory could not be created")]
CouldNotCreateDestinationDirectory,
#[error("The response used an unexpected Content-Encoding: {0}")]
UnexpectedContentEncoding(String),
#[error("Error during downloading: {0}")]
ErrorDuringDownloading(std::io::Error),
#[error("Error while writing the downloaded file: {0}")]
ErrorWhileWritingDownloadedFile(std::io::Error),
#[error("Redirect-related error")]
Redirect(Box<dyn std::error::Error + Send + Sync>),
#[error("Other error: {0}")]
Other(Box<dyn std::error::Error + Send + Sync>),
}
#[derive(thiserror::Error, Debug)]
pub enum CabExtractionError {
#[error("Empty CAB archive")]
EmptyCab,
#[error("Could not open CAB file: {0}")]
CouldNotOpenCabFile(std::io::Error),
#[error("Error while parsing the CAB file: {0}")]
CabParsing(std::io::Error),
#[error("Error while reading the CAB file: {0}")]
CabReading(std::io::Error),
#[error("Error while writing the file: {0}")]
FileWriting(std::io::Error),
#[error("Redirect-related error")]
Redirect(Box<dyn std::error::Error + Send + Sync>),
#[error("Other error: {0}")]
Other(Box<dyn std::error::Error + Send + Sync>),
}
#[cfg(test)]
#[test]
fn test_download_error_is_sync() {
fn assert_sync<T: Sync>() {}
assert_sync::<DownloadError>();
}
impl From<reqwest::Error> for DownloadError {
fn from(e: reqwest::Error) -> Self {
if e.is_status() {
DownloadError::StatusError(e.status().unwrap())
} else if e.is_request() {
DownloadError::OpenFailed(e.into())
} else if e.is_redirect() {
DownloadError::Redirect(e.into())
} else if e.is_timeout() {
DownloadError::Timeout
} else {
DownloadError::Other(e.into())
}
}
}
pub trait SymsrvObserver: Send + Sync + 'static {
fn on_new_download_before_connect(&self, download_id: u64, url: &str);
fn on_download_started(&self, download_id: u64);
fn on_download_progress(&self, download_id: u64, bytes_so_far: u64, total_bytes: Option<u64>);
fn on_download_completed(
&self,
download_id: u64,
uncompressed_size_in_bytes: u64,
time_until_headers: Duration,
time_until_completed: Duration,
);
fn on_download_failed(&self, download_id: u64, reason: DownloadError);
fn on_download_canceled(&self, download_id: u64);
fn on_new_cab_extraction(&self, extraction_id: u64, dest_path: &Path);
fn on_cab_extraction_progress(&self, extraction_id: u64, bytes_so_far: u64, total_bytes: u64);
fn on_cab_extraction_completed(
&self,
extraction_id: u64,
uncompressed_size_in_bytes: u64,
time_until_completed: Duration,
);
fn on_cab_extraction_failed(&self, extraction_id: u64, reason: CabExtractionError);
fn on_cab_extraction_canceled(&self, extraction_id: u64);
fn on_file_created(&self, path: &Path, size_in_bytes: u64);
fn on_file_accessed(&self, path: &Path);
fn on_file_missed(&self, path: &Path);
}
static NEXT_DOWNLOAD_OR_EXTRACTION_ID: AtomicU64 = AtomicU64::new(0);
pub struct SymsrvDownloader {
inner: Arc<SymsrvDownloaderInner>,
inflight_request_cache:
ComputationCoalescer<(String, String, bool), PinBoxDynFuture<Result<PathBuf, Error>>>,
}
type PinBoxDynFuture<T> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;
struct SymsrvDownloaderInner {
symbol_path: Vec<NtSymbolPathEntry>,
default_downstream_store: Option<PathBuf>,
observer: Option<Arc<dyn SymsrvObserver>>,
reqwest_client: Result<reqwest::Client, reqwest::Error>,
}
#[cfg(test)]
#[test]
fn test_symsrv_downloader_error_is_send_and_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<SymsrvDownloader>();
assert_sync::<SymsrvDownloader>();
}
impl SymsrvDownloader {
pub fn new(symbol_path: Vec<NtSymbolPathEntry>) -> Self {
Self {
inner: Arc::new(SymsrvDownloaderInner::new(symbol_path)),
inflight_request_cache: ComputationCoalescer::new(),
}
}
pub fn set_observer(&mut self, observer: Option<Arc<dyn SymsrvObserver>>) {
Arc::get_mut(&mut self.inner).unwrap().observer = observer;
}
pub fn set_default_downstream_store<P: Into<PathBuf>>(
&mut self,
default_downstream_store: Option<P>,
) {
Arc::get_mut(&mut self.inner)
.unwrap()
.default_downstream_store = default_downstream_store.map(Into::into);
}
pub async fn get_file(&self, filename: &str, hash: &str) -> Result<PathBuf, Error> {
self.get_file_impl(filename, hash, true).await
}
pub async fn get_file_no_download(&self, filename: &str, hash: &str) -> Result<PathBuf, Error> {
self.get_file_impl(filename, hash, false).await
}
async fn get_file_impl(
&self,
filename: &str,
hash: &str,
allow_downloads: bool,
) -> Result<PathBuf, Error> {
let inner = self.inner.clone();
let filename = filename.to_owned();
let hash = hash.to_owned();
self.inflight_request_cache
.subscribe_or_compute(
&(filename.clone(), hash.clone(), allow_downloads),
move || {
let f =
async move { inner.get_file_impl(&filename, &hash, allow_downloads).await };
Box::pin(f)
},
)
.await
}
}
impl SymsrvDownloaderInner {
pub fn new(symbol_path: Vec<NtSymbolPathEntry>) -> Self {
let builder = reqwest::Client::builder();
let builder = builder.http1_only();
let builder = builder.no_gzip().no_brotli().no_deflate();
let client = builder.build();
Self {
symbol_path,
default_downstream_store: None,
observer: None,
reqwest_client: client,
}
}
pub async fn get_file_impl(
&self,
filename: &str,
hash: &str,
allow_downloads: bool,
) -> Result<PathBuf, Error> {
let path: PathBuf = [filename, hash, filename].iter().collect();
let rel_path_uncompressed = &path;
let rel_path_compressed = create_compressed_path(rel_path_uncompressed)?;
let mut persisted_cache_paths: Vec<CachePath> = Vec::new();
for entry in &self.symbol_path {
match entry {
NtSymbolPathEntry::Cache(cache_dir) => {
let cache_path = CachePath::Path(cache_dir.into());
if persisted_cache_paths.contains(&cache_path) {
continue;
}
if let Some(found_path) = self
.check_directory(
cache_dir,
&persisted_cache_paths,
rel_path_uncompressed,
&rel_path_compressed,
)
.await?
{
return Ok(found_path);
}
persisted_cache_paths.push(cache_path);
}
NtSymbolPathEntry::Chain {
cache_paths, urls, ..
} => {
let mut parent_cache_paths = persisted_cache_paths.clone();
for cache_path in cache_paths {
if parent_cache_paths.contains(cache_path) {
continue;
}
parent_cache_paths.push(cache_path.clone());
let (_, parent_cache_paths) = parent_cache_paths.split_last().unwrap();
if let Some(cache_dir) = self.resolve_cache_path(cache_path) {
if let Some(found_path) = self
.check_directory(
cache_dir,
parent_cache_paths,
rel_path_uncompressed,
&rel_path_compressed,
)
.await?
{
return Ok(found_path);
}
}
}
if !allow_downloads {
continue;
}
let (download_dest_cache, remaining_caches) = parent_cache_paths
.split_last()
.unwrap_or((&CachePath::DefaultDownstreamStore, &[]));
let download_dest_cache_dir = self
.resolve_cache_path(download_dest_cache)
.ok_or(Error::NoDefaultDownstreamStore)?;
let bottom_cache = parent_cache_paths
.first()
.unwrap_or(&CachePath::DefaultDownstreamStore);
let mut file_urls = Vec::with_capacity(urls.len() * 2);
for server_url in urls {
file_urls.push((
url_join(server_url, rel_path_uncompressed.components()),
false,
));
file_urls
.push((url_join(server_url, rel_path_compressed.components()), true));
}
let response_futures: Vec<_> = file_urls
.into_iter()
.map(|(file_url, is_compressed)| async move {
(
self.prepare_download_of_file(&file_url).await,
is_compressed,
)
})
.map(Box::pin)
.collect();
let Some((notifier, response, is_compressed)) = async {
let mut response_futures = PollAllPreservingOrder::new(response_futures);
while let Some(next_response) = response_futures.next().await {
let (prepared_response, is_compressed) = next_response;
if let Some((notifier, response)) = prepared_response {
return Some((notifier, response, is_compressed));
};
}
None
}
.await
else {
continue;
};
let uncompressed_dest_path = if is_compressed {
let (rx, tx) = remotely_fed_cursor::create_cursor_channel();
let download_dest_path_future = self.download_file_to_cache(
notifier,
response,
&rel_path_compressed,
download_dest_cache_dir,
Some(tx),
);
let extraction_result_future = self.extract_to_file_in_cache(
CabDataSource::Cursor(rx),
rel_path_uncompressed,
bottom_cache,
);
let (download_dest_path, extraction_result) =
future::join(download_dest_path_future, extraction_result_future).await;
let Some(dest_path) = download_dest_path else {
continue;
};
if let Some((_remaining_bottom_cache, remaining_mid_level_caches)) =
remaining_caches.split_first()
{
self.copy_file_to_caches(
&rel_path_compressed,
&dest_path,
remaining_mid_level_caches,
)
.await;
}
extraction_result?
} else {
let dest_path = self
.download_file_to_cache(
notifier,
response,
rel_path_uncompressed,
download_dest_cache_dir,
None,
)
.await;
let Some(dest_path) = dest_path else { continue };
self.copy_file_to_caches(
rel_path_uncompressed,
&dest_path,
remaining_caches,
)
.await;
dest_path
};
return Ok(uncompressed_dest_path);
}
NtSymbolPathEntry::LocalOrShare(dir_path) => {
if persisted_cache_paths.contains(&CachePath::Path(dir_path.into())) {
continue;
}
if let Some(found_path) = self
.check_directory(
dir_path,
&persisted_cache_paths,
rel_path_uncompressed,
&rel_path_compressed,
)
.await?
{
return Ok(found_path);
};
}
}
}
Err(Error::NotFound)
}
async fn check_file_exists(&self, path: &Path) -> bool {
let file_exists = matches!(tokio::fs::metadata(path).await, Ok(meta) if meta.is_file());
if !file_exists {
if let Some(observer) = self.observer.as_deref() {
observer.on_file_missed(path);
}
}
file_exists
}
fn resolve_cache_path<'a>(&'a self, cache_path: &'a CachePath) -> Option<&'a Path> {
match cache_path {
CachePath::Path(path) => Some(path),
CachePath::DefaultDownstreamStore => self.default_downstream_store.as_deref(),
}
}
async fn check_directory(
&self,
dir: &Path,
parent_cache_paths: &[CachePath],
rel_path_uncompressed: &Path,
rel_path_compressed: &Path,
) -> Result<Option<PathBuf>, Error> {
let full_candidate_path = dir.join(rel_path_uncompressed);
let full_candidate_path_compr = dir.join(rel_path_compressed);
let (abs_path, is_compressed) = if self.check_file_exists(&full_candidate_path).await {
(full_candidate_path, false)
} else if self.check_file_exists(&full_candidate_path_compr).await {
(full_candidate_path_compr, true)
} else {
return Ok(None);
};
if let Some(observer) = self.observer.as_deref() {
observer.on_file_accessed(&abs_path);
}
let uncompressed_path = if is_compressed {
if let Some((bottom_most_cache, mid_level_caches)) = parent_cache_paths.split_first() {
self.copy_file_to_caches(rel_path_compressed, &abs_path, mid_level_caches)
.await;
self.extract_to_file_in_cache(
CabDataSource::Filename(abs_path.clone()),
rel_path_uncompressed,
bottom_most_cache,
)
.await?
} else {
self.extract_to_file_in_cache(
CabDataSource::Filename(abs_path.clone()),
rel_path_uncompressed,
&CachePath::DefaultDownstreamStore,
)
.await?
}
} else {
abs_path
};
Ok(Some(uncompressed_path))
}
async fn copy_file_to_caches(&self, rel_path: &Path, abs_path: &Path, caches: &[CachePath]) {
for cache_path in caches {
if let Some(cache_dir) = self.resolve_cache_path(cache_path) {
if let Ok(dest_path) = self
.make_dest_path_and_ensure_parent_dirs(rel_path, cache_dir)
.await
{
if let Ok(copied_bytes) = tokio::fs::copy(&abs_path, &dest_path).await {
if let Some(observer) = self.observer.as_deref() {
observer.on_file_created(&dest_path, copied_bytes);
}
}
}
}
}
}
async fn make_dest_path_and_ensure_parent_dirs(
&self,
rel_path: &Path,
cache_path: &Path,
) -> Result<PathBuf, std::io::Error> {
let dest_path = cache_path.join(rel_path);
if let Some(dir) = dest_path.parent() {
tokio::fs::create_dir_all(dir).await?;
}
Ok(dest_path)
}
async fn extract_to_file_in_cache(
&self,
cab_data_source: CabDataSource,
rel_path: &Path,
cache_path: &CachePath,
) -> Result<PathBuf, Error> {
let cache_path = self
.resolve_cache_path(cache_path)
.ok_or(Error::NoDefaultDownstreamStore)?;
let dest_path = self
.make_dest_path_and_ensure_parent_dirs(rel_path, cache_path)
.await?;
let notifier = {
let observer = self.observer.clone();
let extraction_id =
NEXT_DOWNLOAD_OR_EXTRACTION_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Some(observer) = observer.as_deref() {
observer.on_new_cab_extraction(extraction_id, &dest_path);
}
ExtractionStatusReporter::new(extraction_id, observer)
};
let extraction_id = notifier.extraction_id();
let observer = self.observer.clone();
let extracted_size_result = create_file_cleanly(
&dest_path,
|mut dest_file: std::fs::File| async {
tokio::task::spawn_blocking(move || match cab_data_source {
CabDataSource::Filename(compressed_input_path) => {
let file = std::fs::File::open(compressed_input_path)
.map_err(CabExtractionError::CouldNotOpenCabFile)?;
let buf_read = BufReader::new(file);
extract_cab_to_file(extraction_id, buf_read, &mut dest_file, observer)
}
CabDataSource::Cursor(cursor) => {
extract_cab_to_file(extraction_id, cursor, &mut dest_file, observer)
}
})
.await
.expect("task panicked")
},
|| async {
let size = std::fs::metadata(&dest_path)
.map_err(|_| {
CabExtractionError::Other(
"Could not get size of existing extracted file".into(),
)
})?
.len();
Ok(size)
},
)
.await;
let extracted_size = match extracted_size_result {
Ok(size) => size,
Err(e) => {
let error = Error::CabExtraction(format!("{}", e));
match e {
CleanFileCreationError::CallbackIndicatedError(e) => {
notifier.extraction_failed(e);
}
_ => {
notifier.extraction_failed(CabExtractionError::FileWriting(e.into()));
}
}
return Err(error);
}
};
notifier.extraction_completed(extracted_size, Instant::now());
if let Some(observer) = self.observer.as_deref() {
observer.on_file_created(&dest_path, extracted_size);
}
Ok(dest_path)
}
async fn prepare_download_of_file(
&self,
url: &str,
) -> Option<(DownloadStatusReporter, reqwest::Response)> {
let download_id =
NEXT_DOWNLOAD_OR_EXTRACTION_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Some(observer) = self.observer.as_deref() {
observer.on_new_download_before_connect(download_id, url);
}
let reporter = DownloadStatusReporter::new(download_id, self.observer.clone());
let reqwest_client = match self.reqwest_client.as_ref() {
Ok(client) => client,
Err(e) => {
reporter.download_failed(DownloadError::ClientCreationFailed(e.to_string()));
return None;
}
};
let request_builder = reqwest_client.get(url);
let request_builder = request_builder.header("Accept-Encoding", "gzip");
let response_result = request_builder.send().await;
let response_result = response_result.and_then(|response| response.error_for_status());
let response = match response_result {
Ok(response) => response,
Err(e) => {
reporter.download_failed(DownloadError::from(e));
return None;
}
};
Some((reporter, response))
}
async fn download_file_to_cache(
&self,
reporter: DownloadStatusReporter,
response: reqwest::Response,
rel_path: &Path,
cache_dir: &Path,
mut chunk_consumer: Option<RemotelyFedCursorFeeder>,
) -> Option<PathBuf> {
let ts_after_status = Instant::now();
let download_id = reporter.download_id();
if let Some(observer) = self.observer.as_deref() {
observer.on_download_started(download_id);
}
let dest_path = match self
.make_dest_path_and_ensure_parent_dirs(rel_path, cache_dir)
.await
{
Ok(dest_path) => dest_path,
Err(_e) => {
reporter.download_failed(DownloadError::CouldNotCreateDestinationDirectory);
return None;
}
};
let observer = self.observer.clone();
let mut stream = match response_to_uncompressed_stream_with_progress(
response,
move |bytes_so_far, total_bytes| {
if let Some(observer) = observer.as_deref() {
observer.on_download_progress(download_id, bytes_so_far, total_bytes)
}
},
) {
Ok(stream) => stream,
Err(download::Error::UnexpectedContentEncoding(encoding)) => {
reporter.download_failed(DownloadError::UnexpectedContentEncoding(encoding));
return None;
}
};
let download_result: Result<u64, CleanFileCreationError<std::io::Error>> =
create_file_cleanly(
&dest_path,
|dest_file: std::fs::File| async move {
let mut dest_file = tokio::fs::File::from_std(dest_file);
let mut buf = vec![0u8; 2 * 1024 * 1024];
let mut uncompressed_size_in_bytes = 0;
loop {
let count = stream.read(&mut buf).await?;
if count == 0 {
break;
}
uncompressed_size_in_bytes += count as u64;
dest_file.write_all(&buf[..count]).await?;
if let Some(chunk_consumer) = &mut chunk_consumer {
chunk_consumer.feed(&buf[..count]);
}
}
if let Some(chunk_consumer) = &mut chunk_consumer {
chunk_consumer.mark_complete();
}
dest_file.flush().await?;
Ok(uncompressed_size_in_bytes)
},
|| async {
let size = std::fs::metadata(&dest_path)?.len();
Ok(size)
},
)
.await;
let uncompressed_size_in_bytes = match download_result {
Ok(size) => size,
Err(CleanFileCreationError::CallbackIndicatedError(e)) => {
reporter.download_failed(DownloadError::ErrorDuringDownloading(e));
return None;
}
Err(e) => {
reporter.download_failed(DownloadError::ErrorWhileWritingDownloadedFile(e.into()));
return None;
}
};
let ts_after_download = Instant::now();
reporter.download_completed(
uncompressed_size_in_bytes,
ts_after_status,
ts_after_download,
);
if let Some(observer) = self.observer.as_deref() {
observer.on_file_created(&dest_path, uncompressed_size_in_bytes);
}
Some(dest_path)
}
}
enum CabDataSource {
Filename(PathBuf),
Cursor(RemotelyFedCursor),
}
fn get_first_file_entry<R: Read + Seek>(cabinet: &mut cab::Cabinet<R>) -> Option<(String, u64)> {
for folder in cabinet.folder_entries() {
if let Some(file) = folder.file_entries().next() {
return Some((file.name().to_owned(), file.uncompressed_size().into()));
}
}
None
}
fn extract_cab_to_file<R: Read + Seek>(
extraction_id: u64,
source_data: R,
dest_file: &mut std::fs::File,
observer: Option<Arc<dyn SymsrvObserver>>,
) -> Result<u64, CabExtractionError> {
use CabExtractionError::*;
let mut cabinet = cab::Cabinet::new(source_data).map_err(CabParsing)?;
let (file_entry_name, file_extracted_size) =
get_first_file_entry(&mut cabinet).ok_or(EmptyCab)?;
let mut reader = cabinet.read_file(&file_entry_name).map_err(CabParsing)?;
let mut bytes_written = 0;
loop {
let mut buf = [0; 4096];
let bytes_read = reader.read(&mut buf).map_err(CabReading)?;
if bytes_read == 0 {
break;
}
dest_file
.write_all(&buf[..bytes_read])
.map_err(FileWriting)?;
bytes_written += bytes_read as u64;
if let Some(observer) = observer.as_deref() {
observer.on_cab_extraction_progress(extraction_id, bytes_written, file_extracted_size);
}
}
Ok(bytes_written)
}
fn url_join(base_url: &str, components: std::path::Components) -> String {
format!(
"{}/{}",
base_url.trim_end_matches('/'),
components
.map(|c| c.as_os_str().to_string_lossy())
.collect::<Vec<_>>()
.join("/")
)
}
fn create_compressed_path(uncompressed_path: &Path) -> Result<PathBuf, Error> {
let uncompressed_ext = match uncompressed_path.extension() {
Some(ext) => match ext.to_string_lossy().deref() {
"exe" => "ex_",
"dll" => "dl_",
"pdb" => "pd_",
"dbg" => "db_",
_ => return Err(Error::UnrecognizedExtension),
},
None => return Err(Error::NoExtension),
};
let mut compressed_path = uncompressed_path.to_owned();
compressed_path.set_extension(uncompressed_ext);
Ok(compressed_path)
}
struct DownloadStatusReporter {
download_id: Option<u64>,
observer: Option<Arc<dyn SymsrvObserver>>,
ts_before_connect: Instant,
}
impl DownloadStatusReporter {
pub fn new(download_id: u64, observer: Option<Arc<dyn SymsrvObserver>>) -> Self {
Self {
download_id: Some(download_id),
observer,
ts_before_connect: Instant::now(),
}
}
pub fn download_id(&self) -> u64 {
self.download_id.unwrap()
}
pub fn download_failed(mut self, e: DownloadError) {
if let (Some(download_id), Some(observer)) = (self.download_id, self.observer.as_deref()) {
observer.on_download_failed(download_id, e);
}
self.download_id = None;
}
pub fn download_completed(
mut self,
uncompressed_size_in_bytes: u64,
ts_after_headers: Instant,
ts_after_completed: Instant,
) {
if let (Some(download_id), Some(observer)) = (self.download_id, self.observer.as_deref()) {
let time_until_headers = ts_after_headers.duration_since(self.ts_before_connect);
let time_until_completed = ts_after_completed.duration_since(self.ts_before_connect);
observer.on_download_completed(
download_id,
uncompressed_size_in_bytes,
time_until_headers,
time_until_completed,
);
}
self.download_id = None;
}
}
impl Drop for DownloadStatusReporter {
fn drop(&mut self) {
if let (Some(download_id), Some(observer)) = (self.download_id, self.observer.as_deref()) {
observer.on_download_canceled(download_id);
}
}
}
struct ExtractionStatusReporter {
extraction_id: Option<u64>,
observer: Option<Arc<dyn SymsrvObserver>>,
ts_before_start: Instant,
}
impl ExtractionStatusReporter {
pub fn new(extraction_id: u64, observer: Option<Arc<dyn SymsrvObserver>>) -> Self {
Self {
extraction_id: Some(extraction_id),
observer,
ts_before_start: Instant::now(),
}
}
pub fn extraction_id(&self) -> u64 {
self.extraction_id.unwrap()
}
pub fn extraction_failed(mut self, e: CabExtractionError) {
if let (Some(extraction_id), Some(observer)) =
(self.extraction_id, self.observer.as_deref())
{
observer.on_cab_extraction_failed(extraction_id, e);
}
self.extraction_id = None;
}
pub fn extraction_completed(
mut self,
uncompressed_size_in_bytes: u64,
ts_after_completed: Instant,
) {
if let (Some(extraction_id), Some(observer)) =
(self.extraction_id, self.observer.as_deref())
{
let time_until_completed = ts_after_completed.duration_since(self.ts_before_start);
observer.on_cab_extraction_completed(
extraction_id,
uncompressed_size_in_bytes,
time_until_completed,
);
}
self.extraction_id = None;
}
}
impl Drop for ExtractionStatusReporter {
fn drop(&mut self) {
if let (Some(extraction_id), Some(observer)) =
(self.extraction_id, self.observer.as_deref())
{
observer.on_cab_extraction_canceled(extraction_id);
}
}
}