use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use crate::chunk_outcome::ChunkOutcome;
use crate::error::{InnerErrorCode, MeowError};
use crate::inner::UniqueId;
use crate::transfer_executor_trait::TransferTrait;
use crate::transfer_task::TransferTask;
const CHUNK_RETRY_BASE_DELAY_MS: u64 = 200;
const CHUNK_RETRY_MAX_DELAY_MS: u64 = 5_000;
const CHUNK_RETRY_JITTER_PERCENT: u64 = 20;
const RETRYABLE_CHUNK_ERROR_CODES: &[i32] = &[
InnerErrorCode::HttpError as i32,
InnerErrorCode::ResponseStatusError as i32,
];
pub(crate) enum ChunkRetryResult {
Done(ChunkOutcome),
Cancelled,
Failed(MeowError),
}
pub(crate) async fn transfer_chunk_with_retry(
executor: &Arc<dyn TransferTrait>,
task: &TransferTask,
key: &UniqueId,
cancel: &CancellationToken,
offset: u64,
chunk_size: u64,
known_total: u64,
max_chunk_retries: u32,
) -> ChunkRetryResult {
let mut attempt: u32 = 0;
loop {
if cancel.is_cancelled() {
crate::meow_flow_log!(
"chunk_retry",
"cancel before transfer: key={:?} offset={} attempt={}",
key,
offset,
attempt
);
return ChunkRetryResult::Cancelled;
}
match executor
.transfer_chunk(task, offset, chunk_size, known_total)
.await
{
Ok(outcome) => {
if attempt > 0 {
crate::meow_flow_log!(
"chunk_retry",
"retry recovered: key={:?} offset={} attempts_used={} next_offset={}",
key,
offset,
attempt,
outcome.next_offset
);
}
return ChunkRetryResult::Done(outcome);
}
Err(err) => {
if cancel.is_cancelled() {
crate::meow_flow_log!(
"chunk_retry",
"cancel after chunk error: key={:?} offset={} attempt={} err={}",
key,
offset,
attempt,
err
);
return ChunkRetryResult::Cancelled;
}
let retryable = is_transport_retryable(&err);
let reached_limit = attempt >= max_chunk_retries;
if !retryable || reached_limit {
crate::meow_flow_log!(
"chunk_retry",
"give up: key={:?} offset={} attempt={} max_retries={} retryable={} err={}",
key,
offset,
attempt,
max_chunk_retries,
retryable,
err
);
return ChunkRetryResult::Failed(err);
}
let delay_ms = calc_backoff_with_jitter_ms(attempt);
crate::meow_flow_log!(
"chunk_retry",
"retry scheduled: key={:?} offset={} attempt={} next_delay_ms={} err={}",
key,
offset,
attempt + 1,
delay_ms,
err
);
tokio::select! {
_ = cancel.cancelled() => {
crate::meow_flow_log!(
"chunk_retry",
"cancel during backoff wait: key={:?} offset={} attempt={}",
key,
offset,
attempt
);
return ChunkRetryResult::Cancelled;
}
_ = sleep(Duration::from_millis(delay_ms)) => {}
}
attempt += 1;
}
}
}
}
pub(crate) fn is_transport_retryable(err: &MeowError) -> bool {
RETRYABLE_CHUNK_ERROR_CODES
.iter()
.any(|code| *code == err.code())
}
pub(crate) fn is_connection_layer_retryable(err: &MeowError) -> bool {
err.code() == InnerErrorCode::HttpError as i32
}
pub(crate) fn calc_backoff_with_jitter_ms(attempt: u32) -> u64 {
let exp = CHUNK_RETRY_BASE_DELAY_MS.saturating_mul(1u64 << attempt.min(20));
let capped = exp.min(CHUNK_RETRY_MAX_DELAY_MS);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.subsec_nanos() as u64)
.unwrap_or(0);
let jitter_span = CHUNK_RETRY_JITTER_PERCENT * 2;
let ratio_percent = 100 - CHUNK_RETRY_JITTER_PERCENT + (nanos % (jitter_span + 1));
capped.saturating_mul(ratio_percent) / 100
}