use std::future::Future;
use std::time::Duration;
use futures_util::future::{select, Either};
use crate::cancellation::CancellationToken;
use crate::error::StoreError;
use crate::models::catalog::{
Availability, DisplayCatalogModel, Image, Package, Price, Product, ProductLocalizedProperty,
Sku,
};
use crate::models::enums::{DCatEndpoint, DeviceFamily, DisplayCatalogResult, IdentifierType};
use crate::models::fe3::PackageInstance;
use crate::models::locale::Locale;
use crate::models::search::DCatSearch;
use crate::services::fe3::FE3Handler;
use crate::utilities::helpers::{create_dcat_uri, endpoint_to_search_url};
use crate::utilities::sleep::sleep;
use log::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub timeout: Duration,
pub max_retries: u32,
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub retry_on_status: Vec<u16>,
pub user_agent: String,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
timeout: Duration::from_secs(30),
max_retries: 3,
initial_backoff: Duration::from_millis(500),
max_backoff: Duration::from_secs(5),
retry_on_status: vec![408, 429, 502, 503, 504],
user_agent: "StoreLib".into(),
}
}
}
impl ClientConfig {
pub(crate) fn backoff_for(&self, attempt: u32) -> Duration {
let factor: u64 = 1u64.checked_shl(attempt).unwrap_or(u64::MAX);
let nanos = self
.initial_backoff
.as_nanos()
.saturating_mul(factor as u128);
let dur = Duration::from_nanos(nanos.min(u64::MAX as u128) as u64);
std::cmp::min(dur, self.max_backoff)
}
}
pub(crate) async fn race_cancel<F, T>(
op: F,
cancel: Option<&CancellationToken>,
) -> Result<T, StoreError>
where
F: Future<Output = Result<T, StoreError>>,
{
let op = std::pin::pin!(op);
match cancel {
Some(tok) => {
let cancel_fut = tok.cancelled();
let cancel_fut = std::pin::pin!(cancel_fut);
match select(op, cancel_fut).await {
Either::Left((res, _)) => res,
Either::Right(_) => Err(StoreError::Cancelled),
}
}
None => op.await,
}
}
pub(crate) async fn send_with_retry<F, P>(
make_req: F,
cfg: &ClientConfig,
cancel: Option<&CancellationToken>,
on_progress: P,
) -> Result<reqwest::Response, StoreError>
where
F: Fn() -> reqwest::RequestBuilder,
P: Fn(&'static str, String),
{
let total_attempts = cfg.max_retries.saturating_add(1);
let mut last_err: Option<StoreError> = None;
for attempt in 0..total_attempts {
if attempt > 0 {
let backoff = cfg.backoff_for(attempt - 1);
on_progress(
"retry.wait",
format!(
"waiting {}ms before attempt {}/{}",
backoff.as_millis(),
attempt + 1,
total_attempts,
),
);
race_cancel(
async {
sleep(backoff).await;
Ok::<(), StoreError>(())
},
cancel,
)
.await?;
on_progress(
"retry.attempt",
format!("attempt {}/{}", attempt + 1, total_attempts),
);
}
let send_fut = make_req().send();
let result = race_cancel(
async {
send_fut.await.map_err(|e| {
if e.is_timeout() {
StoreError::TimedOut
} else {
StoreError::Http(e)
}
})
},
cancel,
)
.await;
match result {
Ok(resp) => {
let status = resp.status().as_u16();
let retryable = cfg.retry_on_status.contains(&status);
let attempts_left = attempt + 1 < total_attempts;
if retryable && attempts_left {
continue;
}
return Ok(resp);
}
Err(StoreError::Cancelled) => return Err(StoreError::Cancelled),
Err(e) => {
if attempt + 1 < total_attempts {
last_err = Some(e);
continue;
}
return Err(e);
}
}
}
Err(last_err.unwrap_or_else(|| StoreError::Other("retries exhausted".into())))
}
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ProgressEvent {
pub stage: &'static str,
pub message: String,
pub current: Option<u32>,
pub total: Option<u32>,
}
#[cfg(not(target_arch = "wasm32"))]
pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + Send + Sync + 'static>;
#[cfg(target_arch = "wasm32")]
pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + 'static>;
pub struct DisplayCatalogHandler {
pub product_listing: Option<DisplayCatalogModel>,
pub error: Option<String>,
pub selected_endpoint: DCatEndpoint,
pub result: Option<DisplayCatalogResult>,
pub device_family: DeviceFamily,
pub search_result: Option<DCatSearch>,
pub id: Option<String>,
pub selected_locale: Locale,
pub is_found: bool,
client: reqwest::Client,
pub(crate) config: ClientConfig,
progress: Option<ProgressCallback>,
}
impl DisplayCatalogHandler {
pub fn new(endpoint: DCatEndpoint, locale: Locale) -> Self {
Self::with_config(endpoint, locale, ClientConfig::default())
}
pub fn with_config(endpoint: DCatEndpoint, locale: Locale, config: ClientConfig) -> Self {
let client = Self::build_client(&config);
DisplayCatalogHandler {
product_listing: None,
error: None,
selected_endpoint: endpoint,
result: None,
device_family: DeviceFamily::Desktop,
search_result: None,
id: None,
selected_locale: locale,
is_found: false,
client,
config,
progress: None,
}
}
pub fn config(&self) -> &ClientConfig {
&self.config
}
pub fn set_progress_callback(&mut self, cb: ProgressCallback) {
self.progress = Some(cb);
}
pub fn clear_progress_callback(&mut self) {
self.progress = None;
}
pub(crate) fn emit(&self, stage: &'static str, message: impl Into<String>) {
if let Some(cb) = &self.progress {
cb(ProgressEvent {
stage,
message: message.into(),
current: None,
total: None,
});
}
}
pub(crate) fn emit_counter(
&self,
stage: &'static str,
message: impl Into<String>,
current: u32,
total: u32,
) {
if let Some(cb) = &self.progress {
cb(ProgressEvent {
stage,
message: message.into(),
current: Some(current),
total: Some(total),
});
}
}
pub fn production() -> Self {
Self::new(DCatEndpoint::Production, Locale::production())
}
pub fn product(&self) -> Option<&Product> {
let listing = self.product_listing.as_ref()?;
listing
.products
.as_deref()
.and_then(|v| v.first())
.or(listing.product.as_ref())
}
pub fn localized(&self) -> Option<&ProductLocalizedProperty> {
self.product()?.localized_properties.as_deref()?.first()
}
pub fn title(&self) -> Option<&str> {
self.localized()?.product_title.as_deref()
}
pub fn description(&self) -> Option<&str> {
self.localized()?.product_description.as_deref()
}
pub fn publisher_name(&self) -> Option<&str> {
self.localized()?.publisher_name.as_deref()
}
pub fn images_with_purpose(&self, purpose: &str) -> Vec<&Image> {
self.product()
.and_then(|p| p.localized_properties.as_deref())
.into_iter()
.flatten()
.flat_map(|lp| lp.images.as_deref().unwrap_or(&[]))
.filter(|img| img.image_purpose.as_deref() == Some(purpose))
.collect()
}
pub fn sku(&self) -> Option<&Sku> {
self.product()?
.display_sku_availabilities
.as_deref()?
.first()?
.sku
.as_ref()
}
pub fn availabilities(&self) -> Vec<&Availability> {
self.product()
.and_then(|p| p.display_sku_availabilities.as_deref())
.into_iter()
.flatten()
.flat_map(|dsa| dsa.availabilities.as_deref().unwrap_or(&[]))
.collect()
}
pub fn prices(&self) -> Vec<&Price> {
self.availabilities()
.into_iter()
.filter_map(|a| a.order_management_data.as_ref()?.price.as_ref())
.collect()
}
pub fn price(&self) -> Option<&Price> {
self.prices().into_iter().next()
}
pub fn packages(&self) -> &[Package] {
self.sku()
.and_then(|sku| sku.properties.as_ref())
.and_then(|props| props.packages.as_deref())
.unwrap_or(&[])
}
pub fn wu_category_id(&self) -> Option<&str> {
self.sku()?
.properties
.as_ref()?
.fulfillment_data
.as_ref()?
.wu_category_id
.as_deref()
}
pub fn last_modified_date(&self) -> Option<&str> {
self.product()?.last_modified_date.as_deref()
}
fn build_client(config: &ClientConfig) -> reqwest::Client {
#[cfg(not(target_arch = "wasm32"))]
let builder = reqwest::Client::builder()
.user_agent(&config.user_agent)
.timeout(config.timeout);
#[cfg(target_arch = "wasm32")]
let builder = reqwest::Client::builder().user_agent(&config.user_agent);
builder.build().unwrap_or_default()
}
pub async fn query_dcat(
&mut self,
id: &str,
id_type: IdentifierType,
auth_token: Option<&str>,
) -> Result<(), StoreError> {
self.query_dcat_with_cancel(id, id_type, auth_token, None)
.await
}
pub async fn query_dcat_with_cancel(
&mut self,
id: &str,
id_type: IdentifierType,
auth_token: Option<&str>,
cancel: Option<&CancellationToken>,
) -> Result<(), StoreError> {
race_cancel(self.query_dcat_inner(id, id_type, auth_token), cancel).await
}
async fn query_dcat_inner(
&mut self,
id: &str,
id_type: IdentifierType,
auth_token: Option<&str>,
) -> Result<(), StoreError> {
self.id = Some(id.to_owned());
self.result = None;
self.is_found = false;
let url = create_dcat_uri(&self.selected_endpoint, id, &id_type, &self.selected_locale);
debug!("DCat query: GET {url}");
self.emit("dcat.request", format!("GET id={id}"));
let auth = auth_token.filter(|t| !t.is_empty());
let response = send_with_retry(
|| {
let mut r = self.client.get(&url);
if let Some(token) = auth {
r = r.header("Authentication", token);
}
r
},
&self.config,
None,
|stage, msg| self.emit(stage, msg),
)
.await
.map_err(|e| {
if matches!(e, StoreError::TimedOut) {
warn!("DCat query timed out for id={id}");
}
e
})?;
let status = response.status();
debug!("DCat response: HTTP {status}");
self.emit("dcat.response", format!("HTTP {status}"));
if status.is_success() {
let body = response.text().await.map_err(StoreError::Http)?;
debug!("DCat response body: {} bytes", body.len());
self.emit("dcat.parse", format!("{} bytes", body.len()));
let model: DisplayCatalogModel = serde_json::from_str(&body).map_err(|e| {
error!("DCat JSON parse error: {e}");
log_json_context(&body, e.column());
StoreError::Json(e)
})?;
let title = model
.products
.as_deref()
.and_then(|v| v.first())
.or(model.product.as_ref())
.and_then(|p| p.localized_properties.as_deref())
.and_then(|v| v.first())
.and_then(|lp| lp.product_title.as_deref())
.unwrap_or("<no title>");
info!("DCat found: \"{title}\" (id={id})");
self.emit("dcat.done", format!("\"{title}\""));
self.product_listing = Some(model);
self.result = Some(DisplayCatalogResult::Found);
self.is_found = true;
Ok(())
} else if status == reqwest::StatusCode::NOT_FOUND {
warn!("DCat: product not found (id={id})");
self.emit("dcat.notFound", format!("id={id}"));
self.result = Some(DisplayCatalogResult::NotFound);
Err(StoreError::NotFound)
} else {
let body = response.text().await.unwrap_or_default();
Err(StoreError::Other(format!(
"Failed to query DisplayCatalog endpoint {:?}, status {}, body: {}",
self.selected_endpoint, status, body
)))
}
}
pub async fn query_dcat_batch(
&mut self,
ids: &[&str],
auth_token: Option<&str>,
) -> Result<(), StoreError> {
self.query_dcat_batch_with_cancel(ids, auth_token, None)
.await
}
pub async fn query_dcat_batch_with_cancel(
&mut self,
ids: &[&str],
auth_token: Option<&str>,
cancel: Option<&CancellationToken>,
) -> Result<(), StoreError> {
race_cancel(self.query_dcat_batch_inner(ids, auth_token), cancel).await
}
async fn query_dcat_batch_inner(
&mut self,
ids: &[&str],
auth_token: Option<&str>,
) -> Result<(), StoreError> {
if ids.is_empty() {
return Err(StoreError::Other(
"query_dcat_batch: ids must be non-empty".into(),
));
}
self.id = None;
self.result = None;
self.is_found = false;
let url = crate::utilities::helpers::create_dcat_batch_uri(
&self.selected_endpoint,
ids,
&self.selected_locale,
);
debug!("DCat batch query: GET {url}");
self.emit("dcat.request", format!("batch GET ({} ids)", ids.len()));
let auth = auth_token.filter(|t| !t.is_empty());
let response = send_with_retry(
|| {
let mut r = self.client.get(&url);
if let Some(token) = auth {
r = r.header("Authentication", token);
}
r
},
&self.config,
None,
|stage, msg| self.emit(stage, msg),
)
.await?;
let status = response.status();
debug!("DCat batch response: HTTP {status}");
self.emit("dcat.response", format!("HTTP {status}"));
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
return Err(StoreError::Other(format!(
"Failed batch DisplayCatalog query for {} id(s), status {status}, body: {body}",
ids.len(),
)));
}
let body = response.text().await.map_err(StoreError::Http)?;
self.emit("dcat.parse", format!("{} bytes", body.len()));
let model: DisplayCatalogModel = serde_json::from_str(&body).map_err(|e| {
error!("DCat batch JSON parse error: {e}");
log_json_context(&body, e.column());
StoreError::Json(e)
})?;
let count = model.products.as_deref().map(|v| v.len()).unwrap_or(0);
info!(
"DCat batch: {count} product(s) for {} requested id(s)",
ids.len()
);
self.emit("dcat.done", format!("{count} product(s)"));
self.product_listing = Some(model);
self.result = Some(DisplayCatalogResult::Found);
self.is_found = count > 0;
Ok(())
}
pub fn products(&self) -> &[Product] {
self.product_listing
.as_ref()
.and_then(|m| m.products.as_deref())
.unwrap_or(&[])
}
pub async fn get_packages_for_product(
&self,
msa_token: Option<&str>,
) -> Result<Vec<PackageInstance>, StoreError> {
self.get_packages_for_product_with_cancel(msa_token, None)
.await
}
pub async fn get_packages_for_product_with_cancel(
&self,
msa_token: Option<&str>,
cancel: Option<&CancellationToken>,
) -> Result<Vec<PackageInstance>, StoreError> {
race_cancel(self.get_packages_for_product_inner(msa_token), cancel).await
}
async fn get_packages_for_product_inner(
&self,
msa_token: Option<&str>,
) -> Result<Vec<PackageInstance>, StoreError> {
let listing = self.product_listing.as_ref().ok_or_else(|| {
StoreError::Other("Cannot get packages: product data is null.".into())
})?;
let product = listing
.products
.as_deref()
.and_then(|v| v.first())
.or(listing.product.as_ref())
.ok_or_else(|| {
StoreError::Other("Cannot get packages: product data is null.".into())
})?;
let wu_category_id = product
.display_sku_availabilities
.as_deref()
.and_then(|v| v.first())
.and_then(|dsa| dsa.sku.as_ref())
.and_then(|sku| sku.properties.as_ref())
.and_then(|props| props.fulfillment_data.as_ref())
.and_then(|fd| fd.wu_category_id.as_deref())
.ok_or_else(|| {
StoreError::Other(
"Cannot get packages: FulfillmentData (WuCategoryId) is missing.".into(),
)
})?;
debug!("FE3: WuCategoryId={wu_category_id}");
self.emit("fe3.start", format!("WuCategoryId={wu_category_id}"));
self.emit("fe3.getCookie", "POST GetCookie");
let cookie = FE3Handler::get_cookie(&self.client).await?;
self.emit("fe3.syncUpdates", format!("cookie {} bytes", cookie.len()));
let xml =
FE3Handler::sync_updates_with_cookie(&cookie, wu_category_id, msa_token, &self.client)
.await?;
self.emit("fe3.parseUpdateIds", format!("{} bytes XML", xml.len()));
let (update_ids, revision_ids) = FE3Handler::process_update_ids(&xml)?;
debug!("FE3: {} update ID(s) parsed", update_ids.len());
self.emit_counter(
"fe3.parseUpdateIds.done",
"update IDs parsed",
update_ids.len() as u32,
update_ids.len() as u32,
);
self.emit("fe3.parsePackages", "parsing package instances");
let mut instances = FE3Handler::get_package_instances(&xml).await?;
debug!("FE3: {} package instance(s) found", instances.len());
self.emit_counter(
"fe3.parsePackages.done",
"package instances parsed",
instances.len() as u32,
instances.len() as u32,
);
let total_pkgs = instances.len() as u32;
for (i, inst) in instances.iter().enumerate() {
let uid = update_ids.get(i).map(String::as_str).unwrap_or("");
self.emit_counter(
"fe3.packageFound",
format!("{} | updateId={}", inst.package_moniker, uid),
(i + 1) as u32,
total_pkgs,
);
}
self.emit(
"fe3.resolveUrls",
format!("resolving {} URLs", update_ids.len()),
);
let moniker_by_update_id: std::collections::HashMap<&str, &str> = update_ids
.iter()
.zip(instances.iter())
.map(|(uid, inst)| (uid.as_str(), inst.package_moniker.as_str()))
.collect();
let progress = self.progress.as_ref();
let total_req = update_ids.len();
let urls = FE3Handler::get_file_urls_with_progress(
&update_ids,
&revision_ids,
msa_token,
&self.client,
|idx, total, update_id, url, size| {
let Some(cb) = progress else {
return;
};
let moniker = moniker_by_update_id
.get(update_id)
.copied()
.unwrap_or("<unknown>");
cb(ProgressEvent {
stage: "fe3.linkReceived",
message: format!(
"{moniker} | uri={url} | size={} | updateId={update_id}",
size.map(|s| s.to_string()).unwrap_or_else(|| "?".into()),
),
current: Some((idx + 1) as u32),
total: Some(total as u32),
});
},
)
.await?;
debug!("FE3: {} download URL(s) resolved", urls.len());
self.emit_counter(
"fe3.resolveUrls.done",
"URLs resolved",
urls.len() as u32,
total_req as u32,
);
let dcat_size_map: std::collections::HashMap<&str, i64> = product
.display_sku_availabilities
.as_deref()
.iter()
.flat_map(|v| v.iter())
.flat_map(|dsa| {
dsa.sku
.as_ref()
.and_then(|s| s.properties.as_ref())
.and_then(|p| p.packages.as_deref())
.unwrap_or(&[])
})
.filter_map(|pkg| {
let name = pkg.package_full_name.as_deref()?;
let size = pkg.max_download_size_in_bytes?;
Some((name, size))
})
.collect();
debug!("DCat size map ({} entries):", dcat_size_map.len());
for (name, size) in &dcat_size_map {
debug!(" DCat package: {name} = {size} bytes");
}
debug!("FE3 package monikers ({} entries):", instances.len());
for inst in &instances {
debug!(" FE3 moniker: {}", inst.package_moniker);
}
for (i, instance) in instances.iter_mut().enumerate() {
instance.update_id = update_ids.get(i).cloned().unwrap_or_default();
if let Some((url, fe3_size)) = urls.get(i) {
instance.package_uri = Some(url.clone());
instance.file_size = *fe3_size;
}
if instance.file_size.is_none() {
instance.file_size = dcat_size_map
.get(instance.package_moniker.as_str())
.copied();
}
debug!(
" package[{i}]: moniker={} fe3_size={:?} dcat_size={:?}",
instance.package_moniker,
urls.get(i).and_then(|(_, s)| *s),
dcat_size_map
.get(instance.package_moniker.as_str())
.copied(),
);
self.emit_counter(
"fe3.packageResolved",
format!(
"{} | uri={} | size={} | updateId={}",
instance.package_moniker,
instance.package_uri.as_deref().unwrap_or("<none>"),
instance
.file_size
.map(|s| s.to_string())
.unwrap_or_else(|| "?".into()),
instance.update_id,
),
(i + 1) as u32,
total_pkgs,
);
}
info!("Resolved {} package(s)", instances.len());
self.emit(
"fe3.done",
format!("{} package(s) resolved", instances.len()),
);
Ok(instances)
}
pub async fn search_dcat(
&mut self,
query: &str,
device_family: DeviceFamily,
) -> Result<DCatSearch, StoreError> {
self.search_dcat_paged(query, device_family, 0).await
}
pub async fn search_dcat_with_cancel(
&mut self,
query: &str,
device_family: DeviceFamily,
cancel: Option<&CancellationToken>,
) -> Result<DCatSearch, StoreError> {
self.search_dcat_paged_with_cancel(query, device_family, 0, cancel)
.await
}
pub async fn search_dcat_paged(
&mut self,
query: &str,
device_family: DeviceFamily,
skip_count: u32,
) -> Result<DCatSearch, StoreError> {
self.search_dcat_paged_with_cancel(query, device_family, skip_count, None)
.await
}
pub async fn search_dcat_paged_with_cancel(
&mut self,
query: &str,
device_family: DeviceFamily,
skip_count: u32,
cancel: Option<&CancellationToken>,
) -> Result<DCatSearch, StoreError> {
race_cancel(
self.search_dcat_paged_inner(query, device_family, skip_count),
cancel,
)
.await
}
async fn search_dcat_paged_inner(
&mut self,
query: &str,
device_family: DeviceFamily,
skip_count: u32,
) -> Result<DCatSearch, StoreError> {
let base = endpoint_to_search_url(&self.selected_endpoint);
let dep = device_family.platform_dependency_name();
let mut url = format!(
"{}{}&productFamilyNames=apps,games&platformDependencyName={}",
base, query, dep
);
if skip_count > 0 {
url.push_str(&format!("&skipItems={}", skip_count));
}
debug!("DCat search: GET {url}");
self.emit("search.request", format!("\"{query}\""));
let response = send_with_retry(
|| self.client.get(&url),
&self.config,
None,
|stage, msg| self.emit(stage, msg),
)
.await?;
let status = response.status();
debug!("DCat search response: HTTP {status}");
self.emit("search.response", format!("HTTP {status}"));
if status.is_success() {
let body = response.text().await.map_err(StoreError::Http)?;
debug!("DCat search response body: {} bytes", body.len());
self.emit("search.parse", format!("{} bytes", body.len()));
self.result = Some(DisplayCatalogResult::Found);
let result: DCatSearch = serde_json::from_str(&body).map_err(|e| {
error!("DCat search JSON parse error: {e}");
log_json_context(&body, e.column());
StoreError::Json(e)
})?;
let count = result.total_result_count.unwrap_or(0);
info!("DCat search: {count} result(s) for \"{query}\"");
self.emit("search.done", format!("{count} result(s)"));
Ok(result)
} else {
let body = response.text().await.unwrap_or_default();
Err(StoreError::Other(format!(
"Failed to search DisplayCatalog for {:?}, status {}, body: {}",
device_family, status, body
)))
}
}
}
fn log_json_context(body: &str, col: usize) {
let pos = col.saturating_sub(1).min(body.len());
const NARROW: usize = 40;
let narrow_start = pos.saturating_sub(NARROW);
let narrow_end = (pos + NARROW).min(body.len());
let narrow = &body[narrow_start..narrow_end];
let arrow_offset = pos - narrow_start;
error!(
"JSON context (col {}): …{}…\n{:>width$}",
col,
narrow,
"^",
width = arrow_offset + 1
);
const WIDE: usize = 200;
let wide_start = pos.saturating_sub(WIDE);
let wide_end = (pos + WIDE).min(body.len());
debug!(
"JSON wide context (cols {}–{}):\n{}",
wide_start,
wide_end,
&body[wide_start..wide_end]
);
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[test]
fn client_config_default_matches_docs() {
let cfg = ClientConfig::default();
assert_eq!(cfg.timeout, Duration::from_secs(30));
assert_eq!(cfg.max_retries, 3);
assert_eq!(cfg.initial_backoff, Duration::from_millis(500));
assert_eq!(cfg.max_backoff, Duration::from_secs(5));
assert_eq!(cfg.retry_on_status, vec![408, 429, 502, 503, 504]);
assert_eq!(cfg.user_agent, "StoreLib");
}
#[test]
fn backoff_for_doubles_until_cap() {
let cfg = ClientConfig {
initial_backoff: Duration::from_millis(100),
max_backoff: Duration::from_millis(800),
..Default::default()
};
assert_eq!(cfg.backoff_for(0), Duration::from_millis(100));
assert_eq!(cfg.backoff_for(1), Duration::from_millis(200));
assert_eq!(cfg.backoff_for(2), Duration::from_millis(400));
assert_eq!(cfg.backoff_for(3), Duration::from_millis(800));
assert_eq!(cfg.backoff_for(4), Duration::from_millis(800));
assert_eq!(cfg.backoff_for(50), Duration::from_millis(800));
}
#[test]
fn backoff_for_handles_huge_attempt_without_overflow() {
let cfg = ClientConfig::default();
let _ = cfg.backoff_for(64);
let _ = cfg.backoff_for(u32::MAX);
}
#[test]
fn with_config_keeps_overrides() {
let cfg = ClientConfig {
user_agent: "TestUA/1.0".into(),
max_retries: 7,
..Default::default()
};
let h = DisplayCatalogHandler::with_config(
DCatEndpoint::Production,
Locale::production(),
cfg.clone(),
);
assert_eq!(h.config().max_retries, 7);
assert_eq!(h.config().user_agent, "TestUA/1.0");
}
#[tokio::test]
async fn race_cancel_returns_op_result_when_not_cancelled() {
let op = async { Ok::<_, StoreError>(42_u32) };
let result = race_cancel(op, None).await.unwrap();
assert_eq!(result, 42);
}
#[tokio::test]
async fn race_cancel_returns_op_result_when_token_uncancelled() {
let token = CancellationToken::new();
let op = async { Ok::<_, StoreError>("done") };
let result = race_cancel(op, Some(&token)).await.unwrap();
assert_eq!(result, "done");
}
#[tokio::test]
async fn race_cancel_returns_cancelled_when_token_fires_first() {
let token = CancellationToken::new();
let canceller = token.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
canceller.cancel();
});
let op = async {
tokio::time::sleep(Duration::from_secs(60)).await;
Ok::<_, StoreError>(())
};
let err = race_cancel(op, Some(&token)).await.unwrap_err();
assert!(matches!(err, StoreError::Cancelled));
}
#[tokio::test]
async fn race_cancel_returns_cancelled_when_token_already_cancelled() {
let token = CancellationToken::new();
token.cancel();
let op = async {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok::<_, StoreError>(())
};
let err = race_cancel(op, Some(&token)).await.unwrap_err();
assert!(matches!(err, StoreError::Cancelled));
}
#[tokio::test]
async fn race_cancel_propagates_op_error() {
let token = CancellationToken::new();
let op = async { Err::<u32, _>(StoreError::NotFound) };
let err = race_cancel(op, Some(&token)).await.unwrap_err();
assert!(matches!(err, StoreError::NotFound));
}
fn handler_with_listing(json: &str) -> DisplayCatalogHandler {
let model: DisplayCatalogModel = serde_json::from_str(json).unwrap();
let mut h = DisplayCatalogHandler::production();
h.product_listing = Some(model);
h.is_found = true;
h
}
#[test]
fn accessors_return_none_on_empty_handler() {
let h = DisplayCatalogHandler::production();
assert!(h.product().is_none());
assert!(h.title().is_none());
assert!(h.price().is_none());
assert!(h.wu_category_id().is_none());
assert!(h.packages().is_empty());
assert!(h.availabilities().is_empty());
assert!(h.products().is_empty());
}
#[test]
fn title_publisher_description_walk_localized_properties() {
let json = r#"{
"Products":[{
"LocalizedProperties":[{
"ProductTitle":"Netflix",
"ProductDescription":"Watch shows",
"PublisherName":"Netflix, Inc."
}]
}]
}"#;
let h = handler_with_listing(json);
assert_eq!(h.title(), Some("Netflix"));
assert_eq!(h.publisher_name(), Some("Netflix, Inc."));
assert_eq!(h.description(), Some("Watch shows"));
}
#[test]
fn product_falls_back_to_single_product_field() {
let json = r#"{
"Product":{
"LocalizedProperties":[{"ProductTitle":"FromProduct"}]
}
}"#;
let h = handler_with_listing(json);
assert!(h.product().is_some());
assert_eq!(h.title(), Some("FromProduct"));
}
#[test]
fn price_walks_to_first_availability() {
let json = r#"{
"Products":[{
"DisplaySkuAvailabilities":[{
"Sku":{"Properties":{}},
"Availabilities":[{
"OrderManagementData":{
"Price":{"CurrencyCode":"USD","MSRP":9.99,"ListPrice":4.99}
}
}]
}]
}]
}"#;
let h = handler_with_listing(json);
let p = h.price().expect("price should be present");
assert_eq!(p.currency_code.as_deref(), Some("USD"));
assert_eq!(p.msrp, Some(9.99));
assert_eq!(p.list_price, Some(4.99));
assert_eq!(h.prices().len(), 1);
}
#[test]
fn packages_and_wu_category_id_walk_sku_properties() {
let json = r#"{
"Products":[{
"DisplaySkuAvailabilities":[{
"Sku":{"Properties":{
"FulfillmentData":{"WuCategoryId":"cat-abc"},
"Packages":[
{"PackageFullName":"X.Y_1","MaxDownloadSizeInBytes":1234},
{"PackageFullName":"X.Y_2","MaxDownloadSizeInBytes":5678}
]
}}
}]
}]
}"#;
let h = handler_with_listing(json);
assert_eq!(h.wu_category_id(), Some("cat-abc"));
assert_eq!(h.packages().len(), 2);
assert_eq!(h.packages()[0].package_full_name.as_deref(), Some("X.Y_1"),);
assert_eq!(h.packages()[1].max_download_size_in_bytes, Some(5678));
}
#[test]
fn images_with_purpose_filters_correctly() {
let json = r#"{
"Products":[{
"LocalizedProperties":[{
"Images":[
{"ImagePurpose":"Logo","Uri":"//img/logo.png","Height":100,"Width":100},
{"ImagePurpose":"Tile","Uri":"//img/tile.png","Height":300,"Width":300},
{"ImagePurpose":"Screenshot","Uri":"//img/ss1.png","Height":720,"Width":1280},
{"ImagePurpose":"Screenshot","Uri":"//img/ss2.png","Height":720,"Width":1280}
]
}]
}]
}"#;
let h = handler_with_listing(json);
assert_eq!(h.images_with_purpose("Logo").len(), 1);
assert_eq!(h.images_with_purpose("Tile").len(), 1);
assert_eq!(h.images_with_purpose("Screenshot").len(), 2);
assert_eq!(h.images_with_purpose("Banner").len(), 0);
}
#[test]
fn products_returns_all_products_in_batch_response() {
let json = r#"{
"Products":[
{"LocalizedProperties":[{"ProductTitle":"A"}]},
{"LocalizedProperties":[{"ProductTitle":"B"}]},
{"LocalizedProperties":[{"ProductTitle":"C"}]}
]
}"#;
let h = handler_with_listing(json);
let titles: Vec<_> = h
.products()
.iter()
.filter_map(|p| {
p.localized_properties
.as_deref()?
.first()?
.product_title
.as_deref()
})
.collect();
assert_eq!(titles, vec!["A", "B", "C"]);
assert_eq!(h.title(), Some("A"));
}
#[tokio::test]
async fn query_dcat_batch_rejects_empty_ids() {
let mut h = DisplayCatalogHandler::production();
let err = h.query_dcat_batch(&[], None).await.unwrap_err();
assert!(matches!(err, StoreError::Other(_)));
}
#[tokio::test]
async fn query_dcat_batch_populates_products_on_success() {
let server = MockServer::start().await;
let body = r#"{"Products":[
{"LocalizedProperties":[{"ProductTitle":"A"}]},
{"LocalizedProperties":[{"ProductTitle":"B"}]}
],"TotalResultCount":2}"#;
Mock::given(method("GET"))
.respond_with(
ResponseTemplate::new(200)
.insert_header("Content-Type", "application/json")
.set_body_string(body),
)
.expect(1)
.mount(&server)
.await;
let cfg = fast_retry_cfg();
let client = reqwest::Client::new();
let url = format!(
"{}?bigIds=A,B&market=US&languages=en&catalogsource=apps&fieldsTemplate=Details",
server.uri()
);
let resp = send_with_retry(|| client.get(&url), &cfg, None, |_, _| {})
.await
.unwrap();
assert_eq!(resp.status(), 200);
let text = resp.text().await.unwrap();
let model: DisplayCatalogModel = serde_json::from_str(&text).unwrap();
assert_eq!(model.products.as_deref().unwrap().len(), 2);
}
use wiremock::matchers::method;
use wiremock::{Mock, MockServer, ResponseTemplate};
fn fast_retry_cfg() -> ClientConfig {
ClientConfig {
initial_backoff: Duration::from_millis(1),
max_backoff: Duration::from_millis(2),
max_retries: 3,
..Default::default()
}
}
#[tokio::test]
async fn send_with_retry_succeeds_on_first_attempt() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(200).set_body_string("ok"))
.expect(1) .mount(&server)
.await;
let client = reqwest::Client::new();
let cfg = fast_retry_cfg();
let resp = send_with_retry(|| client.get(server.uri()), &cfg, None, |_, _| {})
.await
.unwrap();
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn send_with_retry_retries_on_503_then_succeeds() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(503))
.up_to_n_times(2)
.expect(2)
.mount(&server)
.await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(200).set_body_string("ok"))
.expect(1)
.mount(&server)
.await;
let client = reqwest::Client::new();
let cfg = fast_retry_cfg();
let resp = send_with_retry(|| client.get(server.uri()), &cfg, None, |_, _| {})
.await
.unwrap();
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn send_with_retry_gives_up_after_max_retries() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(503))
.expect(4) .mount(&server)
.await;
let client = reqwest::Client::new();
let cfg = fast_retry_cfg();
let resp = send_with_retry(|| client.get(server.uri()), &cfg, None, |_, _| {})
.await
.unwrap();
assert_eq!(resp.status(), 503);
}
#[tokio::test]
async fn send_with_retry_does_not_retry_on_4xx() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(404))
.expect(1) .mount(&server)
.await;
let client = reqwest::Client::new();
let cfg = fast_retry_cfg();
let resp = send_with_retry(|| client.get(server.uri()), &cfg, None, |_, _| {})
.await
.unwrap();
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn send_with_retry_emits_progress_per_attempt() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(503))
.up_to_n_times(2)
.expect(2)
.mount(&server)
.await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&server)
.await;
let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let log_cb = log.clone();
let client = reqwest::Client::new();
let cfg = fast_retry_cfg();
let _ = send_with_retry(
|| client.get(server.uri()),
&cfg,
None,
|stage, _msg| log_cb.lock().unwrap().push(stage.to_string()),
)
.await
.unwrap();
let stages = log.lock().unwrap().clone();
assert_eq!(
stages,
vec![
"retry.wait".to_string(),
"retry.attempt".to_string(),
"retry.wait".to_string(),
"retry.attempt".to_string(),
],
);
}
#[tokio::test]
async fn send_with_retry_cancel_during_backoff_returns_cancelled_fast() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(503))
.expect(1)
.mount(&server)
.await;
let cfg = ClientConfig {
initial_backoff: Duration::from_secs(60),
max_backoff: Duration::from_secs(60),
max_retries: 3,
..Default::default()
};
let token = CancellationToken::new();
let canceller = token.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
canceller.cancel();
});
let client = reqwest::Client::new();
let started = std::time::Instant::now();
let err = send_with_retry(|| client.get(server.uri()), &cfg, Some(&token), |_, _| {})
.await
.unwrap_err();
let elapsed = started.elapsed();
assert!(matches!(err, StoreError::Cancelled));
assert!(
elapsed < Duration::from_secs(2),
"cancel-during-backoff took too long: {:?}",
elapsed,
);
}
#[test]
fn progress_event_serializes_camel_case() {
let e = ProgressEvent {
stage: "fe3.syncUpdates",
message: "cookie 256 bytes".into(),
current: None,
total: None,
};
let json = serde_json::to_string(&e).unwrap();
assert!(
json.contains("\"stage\":\"fe3.syncUpdates\""),
"got: {json}"
);
assert!(
json.contains("\"message\":\"cookie 256 bytes\""),
"got: {json}",
);
assert!(json.contains("\"current\":null"), "got: {json}");
assert!(json.contains("\"total\":null"), "got: {json}");
}
#[test]
fn progress_event_counter_serializes() {
let e = ProgressEvent {
stage: "fe3.resolveUrls.done",
message: "URLs resolved".into(),
current: Some(7),
total: Some(7),
};
let json = serde_json::to_string(&e).unwrap();
assert!(json.contains("\"current\":7"), "got: {json}");
assert!(json.contains("\"total\":7"), "got: {json}");
}
fn capturing_handler() -> (DisplayCatalogHandler, Arc<Mutex<Vec<ProgressEvent>>>) {
let log = Arc::new(Mutex::new(Vec::<ProgressEvent>::new()));
let log_cb = log.clone();
let mut h = DisplayCatalogHandler::production();
h.set_progress_callback(Box::new(move |e| log_cb.lock().unwrap().push(e)));
(h, log)
}
#[test]
fn emit_invokes_callback() {
let (h, log) = capturing_handler();
h.emit("dcat.request", "GET id=foo");
let events = log.lock().unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].stage, "dcat.request");
assert_eq!(events[0].message, "GET id=foo");
assert_eq!(events[0].current, None);
assert_eq!(events[0].total, None);
}
#[test]
fn emit_counter_carries_progress_numbers() {
let (h, log) = capturing_handler();
h.emit_counter("fe3.resolveUrls.done", "URLs resolved", 5, 12);
let events = log.lock().unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].stage, "fe3.resolveUrls.done");
assert_eq!(events[0].current, Some(5));
assert_eq!(events[0].total, Some(12));
}
#[test]
fn multiple_emits_preserve_order() {
let (h, log) = capturing_handler();
h.emit("dcat.request", "step 1");
h.emit("dcat.response", "step 2");
h.emit_counter("dcat.parse", "step 3", 1, 1);
h.emit("dcat.done", "step 4");
let stages: Vec<&str> = log.lock().unwrap().iter().map(|e| e.stage).collect();
assert_eq!(
stages,
vec!["dcat.request", "dcat.response", "dcat.parse", "dcat.done"],
);
}
#[test]
fn no_callback_means_no_panic() {
let h = DisplayCatalogHandler::production();
h.emit("x", "y");
h.emit_counter("x", "y", 1, 1);
}
#[test]
fn clear_callback_stops_delivery() {
let (mut h, log) = capturing_handler();
h.emit("first", "");
h.clear_progress_callback();
h.emit("second", "");
let events = log.lock().unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].stage, "first");
}
#[test]
fn set_callback_replaces_previous() {
let log_a: Arc<Mutex<Vec<ProgressEvent>>> = Arc::new(Mutex::new(Vec::new()));
let log_b: Arc<Mutex<Vec<ProgressEvent>>> = Arc::new(Mutex::new(Vec::new()));
let mut h = DisplayCatalogHandler::production();
let a = log_a.clone();
h.set_progress_callback(Box::new(move |e| a.lock().unwrap().push(e)));
h.emit("first", "");
let b = log_b.clone();
h.set_progress_callback(Box::new(move |e| b.lock().unwrap().push(e)));
h.emit("second", "");
assert_eq!(log_a.lock().unwrap().len(), 1);
assert_eq!(log_a.lock().unwrap()[0].stage, "first");
assert_eq!(log_b.lock().unwrap().len(), 1);
assert_eq!(log_b.lock().unwrap()[0].stage, "second");
}
}