use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use crate::progress::Progress;
pub type ProgressCallback = Arc<dyn Fn(&Progress) + Send + Sync>;
pub type RetryDelayFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
pub type RetryDelayProvider = Arc<dyn Fn(Duration) -> RetryDelayFuture + Send + Sync>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RetryPolicy {
pub max_retries: u32,
pub base_backoff: Duration,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: 3,
base_backoff: Duration::from_millis(100),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum FetchPhase {
#[default]
Connecting,
Downloading,
Verifying,
Committing,
Completed,
}
impl std::fmt::Display for FetchPhase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FetchPhase::Connecting => write!(f, "Connecting"),
FetchPhase::Downloading => write!(f, "Downloading"),
FetchPhase::Verifying => write!(f, "Verifying"),
FetchPhase::Committing => write!(f, "Committing"),
FetchPhase::Completed => write!(f, "Completed"),
}
}
}
#[derive(Clone)]
pub struct FetchOptions {
pub checksum: Option<[u8; 32]>,
pub retry_policy: RetryPolicy,
pub expected_bytes: Option<u64>,
pub resume_offset: Option<u64>,
pub headers: Arc<[(String, String)]>,
pub on_progress: Option<ProgressCallback>,
pub retry_delay_provider: Option<RetryDelayProvider>,
}
impl fmt::Debug for FetchOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FetchOptions")
.field("checksum", &self.checksum)
.field("retry_policy", &self.retry_policy)
.field("expected_bytes", &self.expected_bytes)
.field("resume_offset", &self.resume_offset)
.field("headers", &self.headers)
.field("on_progress", &"{ ... }")
.field("retry_delay_provider", &"{ ... }")
.finish()
}
}
impl Default for FetchOptions {
fn default() -> Self {
Self {
checksum: None,
retry_policy: RetryPolicy::default(),
expected_bytes: None,
resume_offset: None,
headers: Arc::new([]),
on_progress: None,
retry_delay_provider: None,
}
}
}
impl FetchOptions {
#[must_use]
pub fn checksum(mut self, checksum: Option<[u8; 32]>) -> Self {
self.checksum = checksum;
self
}
#[must_use]
pub fn max_retries(mut self, max_retries: u32) -> Self {
self.retry_policy.max_retries = max_retries;
self
}
#[must_use]
pub fn retry_backoff(mut self, retry_backoff: Duration) -> Self {
self.retry_policy.base_backoff = retry_backoff;
self
}
#[must_use]
pub fn retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
self.retry_policy = retry_policy;
self
}
#[must_use]
pub fn expected_bytes(mut self, expected_bytes: Option<u64>) -> Self {
self.expected_bytes = expected_bytes;
self
}
#[must_use]
pub fn resume_offset(mut self, resume_offset: Option<u64>) -> Self {
self.resume_offset = resume_offset;
self
}
#[must_use]
pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
let mut headers: Vec<_> = self.headers.iter().cloned().collect();
headers.push((key.into(), value.into()));
self.headers = Arc::from(headers);
self
}
#[must_use]
pub fn headers(mut self, headers: Vec<(String, String)>) -> Self {
self.headers = Arc::from(headers);
self
}
#[must_use]
pub fn on_progress(mut self, on_progress: Arc<dyn Fn(&Progress) + Send + Sync>) -> Self {
self.on_progress = Some(on_progress);
self
}
#[must_use]
pub fn retry_delay_provider(mut self, provider: RetryDelayProvider) -> Self {
self.retry_delay_provider = Some(provider);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
#[test]
fn test_fetch_phase_display() {
assert_eq!(FetchPhase::Connecting.to_string(), "Connecting");
assert_eq!(FetchPhase::Downloading.to_string(), "Downloading");
assert_eq!(FetchPhase::Verifying.to_string(), "Verifying");
assert_eq!(FetchPhase::Committing.to_string(), "Committing");
assert_eq!(FetchPhase::Completed.to_string(), "Completed");
}
#[test]
fn test_fetch_phase_default() {
assert_eq!(FetchPhase::default(), FetchPhase::Connecting);
}
#[test]
fn test_fetch_options_default() {
let options = FetchOptions::default();
assert!(options.checksum.is_none());
assert_eq!(options.retry_policy.max_retries, 3);
assert_eq!(
options.retry_policy.base_backoff,
Duration::from_millis(100)
);
assert_eq!(options.expected_bytes, None);
assert_eq!(options.resume_offset, None);
assert!(options.headers.is_empty());
assert!(options.on_progress.is_none());
assert!(options.retry_delay_provider.is_none());
}
#[test]
fn test_fetch_options_checksum() {
let hash = [1u8; 32];
let options = FetchOptions::default().checksum(Some(hash));
assert_eq!(options.checksum, Some(hash));
let options = FetchOptions::default().checksum(None);
assert!(options.checksum.is_none());
}
#[test]
fn test_fetch_options_max_retries() {
let options = FetchOptions::default().max_retries(5);
assert_eq!(options.retry_policy.max_retries, 5);
let options = FetchOptions::default().max_retries(0);
assert_eq!(options.retry_policy.max_retries, 0);
}
#[test]
fn test_fetch_options_retry_backoff() {
let duration = Duration::from_secs(1);
let options = FetchOptions::default().retry_backoff(duration);
assert_eq!(options.retry_policy.base_backoff, duration);
}
#[test]
fn test_fetch_options_resume_and_expected_bytes() {
let options = FetchOptions::default()
.resume_offset(Some(128))
.expected_bytes(Some(512));
assert_eq!(options.resume_offset, Some(128));
assert_eq!(options.expected_bytes, Some(512));
}
#[test]
fn test_fetch_options_header() {
let options = FetchOptions::default()
.header("Authorization", "Bearer token")
.header("User-Agent", "MyApp/1.0");
let headers: Vec<_> = options.headers.iter().cloned().collect();
assert_eq!(headers.len(), 2);
assert!(headers.contains(&("Authorization".to_string(), "Bearer token".to_string())));
assert!(headers.contains(&("User-Agent".to_string(), "MyApp/1.0".to_string())));
}
#[test]
fn test_fetch_options_headers() {
let headers = vec![
("Authorization".to_string(), "Bearer token".to_string()),
("User-Agent".to_string(), "MyApp/1.0".to_string()),
];
let options = FetchOptions::default().headers(headers.clone());
let options_headers: Vec<_> = options.headers.iter().cloned().collect();
assert_eq!(options_headers, headers);
}
#[test]
fn test_fetch_options_headers_replace() {
let options = FetchOptions::default()
.header("Old", "value")
.headers(vec![("New".to_string(), "value".to_string())]);
let headers: Vec<_> = options.headers.iter().cloned().collect();
assert_eq!(headers.len(), 1);
assert!(headers.contains(&("New".to_string(), "value".to_string())));
assert!(!headers.iter().any(|(k, _)| k == "Old"));
}
#[test]
fn test_fetch_options_on_progress() {
let call_count = Arc::new(AtomicU32::new(0));
let call_count_clone = call_count.clone();
let options = FetchOptions::default().on_progress(Arc::new(move |_| {
call_count_clone.fetch_add(1, Ordering::SeqCst);
}));
assert!(options.on_progress.is_some());
if let Some(callback) = &options.on_progress {
let progress = Progress {
phase: FetchPhase::Downloading,
bytes_downloaded: 100,
total_bytes: Some(1000),
retry_count: 0,
performance_metrics: None,
};
callback(&progress);
assert_eq!(call_count.load(Ordering::SeqCst), 1);
}
}
#[test]
fn test_fetch_options_debug() {
let options = FetchOptions::default()
.checksum(Some([1u8; 32]))
.max_retries(5)
.header("Test", "value");
let debug_str = format!("{:?}", options);
assert!(debug_str.contains("FetchOptions"));
assert!(debug_str.contains("checksum: Some(["));
assert!(debug_str.contains("retry_policy"));
assert!(debug_str.contains("{ ... }"));
}
#[test]
fn test_fetch_options_builder_pattern() {
let hash = [2u8; 32];
let options = FetchOptions::default()
.checksum(Some(hash))
.max_retries(10)
.retry_backoff(Duration::from_millis(500))
.header("Custom", "header");
assert_eq!(options.checksum, Some(hash));
assert_eq!(options.retry_policy.max_retries, 10);
assert_eq!(
options.retry_policy.base_backoff,
Duration::from_millis(500)
);
assert_eq!(options.headers.len(), 1);
assert!(options.retry_delay_provider.is_none());
let options2 = FetchOptions::default()
.checksum(Some(hash))
.max_retries(10)
.retry_backoff(Duration::from_millis(500))
.headers(vec![("Another".to_string(), "header".to_string())]);
assert_eq!(options2.checksum, Some(hash));
assert_eq!(options2.retry_policy.max_retries, 10);
assert_eq!(
options2.retry_policy.base_backoff,
Duration::from_millis(500)
);
assert_eq!(options2.headers.len(), 1);
assert!(options2.retry_delay_provider.is_none());
}
#[test]
fn test_fetch_options_retry_delay_provider() {
let options =
FetchOptions::default().retry_delay_provider(Arc::new(|_| Box::pin(async {})));
assert!(options.retry_delay_provider.is_some());
}
#[test]
fn test_fetch_options_clone() {
let options = FetchOptions::default()
.checksum(Some([3u8; 32]))
.header("Test", "value");
let cloned = options.clone();
assert_eq!(cloned.checksum, options.checksum);
assert_eq!(cloned.retry_policy, options.retry_policy);
assert_eq!(cloned.headers.as_ptr(), options.headers.as_ptr()); }
}