use crate::protocol::{SourceProtocol, SourceReader, TransferError};
use crate::retry_transient;
use crate::source::resolve_source;
use crate::transfer::TransferConfig;
use bytes::Bytes;
use futures_core::Stream;
use futures_util::StreamExt;
use std::pin::Pin;
use url::Url;
#[derive(Debug, Clone)]
pub struct StreamInfo {
pub total_size: Option<u64>,
}
pub async fn stream_from_url(
source_url: Url,
config: &TransferConfig,
) -> Result<
(
impl Stream<Item = Result<Bytes, TransferError>> + Send,
StreamInfo,
),
TransferError,
> {
match resolve_source(&source_url, config)? {
crate::source::Source::Http(src) => {
stream_from_source(src, source_url, config.max_retries).await
}
}
}
pub async fn stream_from_source<S: SourceProtocol + Send + 'static>(
mut source: S,
url: Url,
max_retries: u32,
) -> Result<
(
impl Stream<Item = Result<Bytes, TransferError>> + Send,
StreamInfo,
),
TransferError,
>
where
S::Reader: Send,
{
let (reader, read_offset) = retry_transient!(max_retries, source.get_reader(url.clone(), 0))?;
if !read_offset.supports_random_access {
return Err(TransferError::Permanent {
reason: "Source does not support random access \
(e.g. no Accept-Ranges header in HTTP response). \
Streaming requires random access to recover from transient errors \
without data loss."
.into(),
});
}
let info = StreamInfo {
total_size: read_offset.total_size,
};
let initial_stream: Pin<Box<dyn Stream<Item = Result<Bytes, TransferError>> + Send>> =
Box::pin(reader.stream_bytes());
let state = UnfoldState {
source,
reader_stream: initial_stream,
url,
max_retries,
retry_count: 0,
total_bytes_yielded: 0,
};
let stream = futures_util::stream::try_unfold(state, unfold_step);
Ok((stream, info))
}
struct UnfoldState<S: SourceProtocol> {
source: S,
reader_stream: Pin<Box<dyn Stream<Item = Result<Bytes, TransferError>> + Send>>,
url: Url,
max_retries: u32,
retry_count: u32,
total_bytes_yielded: u64,
}
async fn unfold_step<S: SourceProtocol + Send + 'static>(
mut state: UnfoldState<S>,
) -> Result<Option<(Bytes, UnfoldState<S>)>, TransferError>
where
S::Reader: Send,
{
loop {
match state.reader_stream.next().await {
Some(Ok(bytes)) => {
state.total_bytes_yielded += bytes.len() as u64;
return Ok(Some((bytes, state)));
}
Some(Err(TransferError::Transient {
minimum_retry_delay: server_hint,
reason,
..
})) => {
state.retry_count += 1;
if state.retry_count > state.max_retries {
return Err(TransferError::Permanent {
reason: format!(
"exhausted {} retries (last error: {reason})",
state.max_retries
),
});
}
let delay = crate::transfer::backoff_delay(state.retry_count - 1, server_hint);
tracing::warn!(
"Transient error during streaming on attempt {}/{}: {reason}. \
Retrying after {delay:?}.",
state.retry_count,
state.max_retries
);
tokio::time::sleep(delay).await;
let (reader, read_offset) = retry_transient!(
state.max_retries,
state
.source
.get_reader(state.url.clone(), state.total_bytes_yielded)
)?;
if read_offset.offset != state.total_bytes_yielded {
return Err(TransferError::Permanent {
reason: format!(
"Resume failed: source streaming from offset {} \
but {} bytes already yielded to consumer. \
Cannot replay already-consumed bytes.",
read_offset.offset, state.total_bytes_yielded,
),
});
}
state.reader_stream = Box::pin(reader.stream_bytes());
}
Some(Err(e)) => return Err(e),
None => return Ok(None),
}
}
}