use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use crate::chunk_outcome::ChunkOutcome;
use crate::error::{InnerErrorCode, MeowError};
use crate::inner::UniqueId;
use crate::transfer_executor_trait::TransferTrait;
use crate::transfer_task::TransferTask;
const CHUNK_RETRY_BASE_DELAY_MS: u64 = 200;
const CHUNK_RETRY_MAX_DELAY_MS: u64 = 5_000;
const CHUNK_RETRY_JITTER_PERCENT: u64 = 20;
const RETRYABLE_CHUNK_ERROR_CODES: &[i32] = &[
InnerErrorCode::HttpError as i32,
InnerErrorCode::ResponseStatusError as i32,
];
pub(crate) enum ChunkRetryResult {
Done(ChunkOutcome),
Cancelled,
Failed(MeowError),
}
#[derive(Clone, Copy)]
pub(crate) enum ChunkTransferMode {
Whole,
Part,
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn transfer_chunk_with_retry(
executor: &Arc<dyn TransferTrait>,
task: &TransferTask,
key: &UniqueId,
cancel: &CancellationToken,
offset: u64,
chunk_size: u64,
known_total: u64,
max_chunk_retries: u32,
mode: ChunkTransferMode,
) -> ChunkRetryResult {
let mut attempt: u32 = 0;
loop {
if cancel.is_cancelled() {
crate::meow_flow_log!(
"chunk_retry",
"cancel before transfer: key={:?} offset={} attempt={}",
key,
offset,
attempt
);
return ChunkRetryResult::Cancelled;
}
let transfer = match mode {
ChunkTransferMode::Whole => {
executor.transfer_chunk(task, offset, chunk_size, known_total).await
}
ChunkTransferMode::Part => {
executor
.transfer_chunk_part(task, offset, chunk_size, known_total)
.await
}
};
match transfer {
Ok(outcome) => {
if attempt > 0 {
crate::meow_flow_log!(
"chunk_retry",
"retry recovered: key={:?} offset={} attempts_used={} next_offset={}",
key,
offset,
attempt,
outcome.next_offset
);
}
return ChunkRetryResult::Done(outcome);
}
Err(err) => {
if cancel.is_cancelled() {
crate::meow_flow_log!(
"chunk_retry",
"cancel after chunk error: key={:?} offset={} attempt={} err={}",
key,
offset,
attempt,
err
);
return ChunkRetryResult::Cancelled;
}
let retryable = is_transport_retryable(&err);
let reached_limit = attempt >= max_chunk_retries;
if !retryable || reached_limit {
crate::meow_flow_log!(
"chunk_retry",
"give up: key={:?} offset={} attempt={} max_retries={} retryable={} err={}",
key,
offset,
attempt,
max_chunk_retries,
retryable,
err
);
return ChunkRetryResult::Failed(err);
}
let delay_ms = calc_backoff_with_jitter_ms(attempt);
crate::meow_flow_log!(
"chunk_retry",
"retry scheduled: key={:?} offset={} attempt={} next_delay_ms={} err={}",
key,
offset,
attempt + 1,
delay_ms,
err
);
tokio::select! {
_ = cancel.cancelled() => {
crate::meow_flow_log!(
"chunk_retry",
"cancel during backoff wait: key={:?} offset={} attempt={}",
key,
offset,
attempt
);
return ChunkRetryResult::Cancelled;
}
_ = sleep(Duration::from_millis(delay_ms)) => {}
}
attempt += 1;
}
}
}
}
pub(crate) fn is_transport_retryable(err: &MeowError) -> bool {
let code = err.code();
if !RETRYABLE_CHUNK_ERROR_CODES.contains(&code) {
return false;
}
if code == InnerErrorCode::ResponseStatusError as i32 {
if let Some(status) = err.http_status() {
return is_retryable_http_status(status);
}
}
true
}
pub(crate) fn is_retryable_http_status(status: u16) -> bool {
matches!(status, 408 | 429 | 500..=599)
}
pub(crate) fn is_connection_layer_retryable(err: &MeowError) -> bool {
err.code() == InnerErrorCode::HttpError as i32
}
pub(crate) fn calc_backoff_with_jitter_ms(attempt: u32) -> u64 {
let exp = CHUNK_RETRY_BASE_DELAY_MS.saturating_mul(1u64 << attempt.min(20));
let capped = exp.min(CHUNK_RETRY_MAX_DELAY_MS);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.subsec_nanos() as u64)
.unwrap_or(0);
let jitter_span = CHUNK_RETRY_JITTER_PERCENT * 2;
let ratio_percent = 100 - CHUNK_RETRY_JITTER_PERCENT + (nanos % (jitter_span + 1));
capped.saturating_mul(ratio_percent) / 100
}
#[cfg(test)]
mod tests {
use super::{is_retryable_http_status, is_transport_retryable};
use crate::error::{InnerErrorCode, MeowError};
fn response_status_err(status: Option<u16>) -> MeowError {
let err = MeowError::from_code(InnerErrorCode::ResponseStatusError, "status".to_string());
match status {
Some(s) => err.with_http_status(s),
None => err,
}
}
#[test]
fn retryable_status_covers_5xx_and_throttle_codes() {
for s in [500u16, 502, 503, 504, 408, 429] {
assert!(is_retryable_http_status(s), "{s} should be retryable");
}
}
#[test]
fn non_retryable_status_covers_hard_4xx_and_3xx() {
for s in [400u16, 401, 403, 404, 409, 410, 422, 301, 302] {
assert!(!is_retryable_http_status(s), "{s} should not be retryable");
}
}
#[test]
fn transport_retryable_fast_fails_hard_4xx_response_status() {
assert!(!is_transport_retryable(&response_status_err(Some(403))));
assert!(!is_transport_retryable(&response_status_err(Some(404))));
}
#[test]
fn transport_retryable_keeps_5xx_and_throttle_response_status() {
assert!(is_transport_retryable(&response_status_err(Some(500))));
assert!(is_transport_retryable(&response_status_err(Some(503))));
assert!(is_transport_retryable(&response_status_err(Some(429))));
}
#[test]
fn transport_retryable_keeps_status_less_response_status() {
assert!(is_transport_retryable(&response_status_err(None)));
}
#[test]
fn transport_retryable_keeps_http_error_and_rejects_others() {
assert!(is_transport_retryable(&MeowError::from_code1(
InnerErrorCode::HttpError
)));
assert!(!is_transport_retryable(&MeowError::from_code1(
InnerErrorCode::InvalidRange
)));
}
}