use futures::StreamExt;
use crate::{
condow_client::CondowClient,
machinery::{configure_download::DownloadConfiguration, DownloadSpanGuard},
probe::Probe,
retry::ClientRetryWrapper,
streams::{BytesStream, ChunkStream},
};
use super::active_pull;
use parts_bytes_stream::PartsBytesStream;
pub mod part_bytes_stream;
pub mod parts_bytes_stream;
pub(crate) fn download_chunks_sequentially<C: CondowClient, P: Probe + Clone>(
client: ClientRetryWrapper<C>,
configuration: DownloadConfiguration<C::Location>,
probe: P,
download_span_guard: DownloadSpanGuard,
) -> ChunkStream {
let ensure_active_pull = configuration.config.ensure_active_pull;
let log_dl_msg_dbg = configuration.config.log_download_messages_as_debug;
let bytes_hint = configuration.bytes_hint();
let poll_parts = download_parts_seq::DownloadPartsSeq::from_client(
client,
configuration.location,
configuration.part_requests,
probe.clone(),
log_dl_msg_dbg,
download_span_guard,
);
if *ensure_active_pull {
let active_stream = active_pull(poll_parts, probe, log_dl_msg_dbg);
ChunkStream::from_receiver(active_stream, bytes_hint)
} else {
ChunkStream::from_stream(poll_parts.boxed(), bytes_hint)
}
}
pub(crate) fn download_bytes_sequentially<C: CondowClient, P: Probe + Clone>(
client: ClientRetryWrapper<C>,
configuration: DownloadConfiguration<C::Location>,
probe: P,
download_span_guard: DownloadSpanGuard,
) -> BytesStream {
let ensure_active_pull = configuration.config.ensure_active_pull;
let log_dl_msg_dbg = configuration.config.log_download_messages_as_debug;
let bytes_hint = configuration.bytes_hint();
let stream = PartsBytesStream::from_client(
client,
configuration.location,
configuration.part_requests,
probe.clone(),
log_dl_msg_dbg,
download_span_guard.shared_span(),
);
if *ensure_active_pull {
let active_stream = active_pull(stream, probe, log_dl_msg_dbg);
BytesStream::new_tokio_receiver(active_stream, bytes_hint)
} else {
BytesStream::new(stream, bytes_hint)
}
}
mod download_parts_seq {
use std::{
task::Poll,
time::{Duration, Instant},
};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
use pin_project_lite::pin_project;
use crate::{
condow_client::CondowClient,
config::LogDownloadMessagesAsDebug,
errors::CondowError,
machinery::{download::PartChunksStream, part_request::PartRequest, DownloadSpanGuard},
probe::Probe,
retry::ClientRetryWrapper,
streams::{BytesStream, ChunkStreamItem},
InclusiveRange,
};
enum State<P: Probe> {
Streaming(PartChunksStream<P>),
Finished,
}
pin_project! {
pub (crate) struct DownloadPartsSeq<P: Probe> {
get_part_stream: Box<dyn Fn(InclusiveRange) -> BoxFuture<'static, Result<BytesStream, CondowError>> + Send + 'static>,
part_requests: Box<dyn Iterator<Item=PartRequest> + Send + 'static>,
state: State<P>,
probe: P,
download_started_at: Instant,
log_dl_msg_dbg: LogDownloadMessagesAsDebug,
download_span_guard: DownloadSpanGuard,
}
}
impl<P> DownloadPartsSeq<P>
where
P: Probe + Clone,
{
pub fn new<I, L, F>(
get_part_stream: F,
mut part_requests: I,
probe: P,
log_dl_msg_dbg: L,
download_span_guard: DownloadSpanGuard,
) -> Self
where
I: Iterator<Item = PartRequest> + Send + 'static,
L: Into<LogDownloadMessagesAsDebug>,
F: Fn(InclusiveRange) -> BoxFuture<'static, Result<BytesStream, CondowError>>
+ Send
+ 'static,
{
let log_dl_msg_dbg = log_dl_msg_dbg.into();
if let Some(part_request) = part_requests.next() {
let stream = PartChunksStream::new(
&get_part_stream,
part_request,
probe.clone(),
download_span_guard.span(),
);
Self {
get_part_stream: Box::new(get_part_stream),
part_requests: Box::new(part_requests),
state: State::Streaming(stream),
probe,
download_started_at: Instant::now(),
log_dl_msg_dbg,
download_span_guard,
}
} else {
probe.download_completed(Duration::ZERO);
log_dl_msg_dbg.log("download (empty) completed");
Self {
get_part_stream: Box::new(get_part_stream),
part_requests: Box::new(part_requests),
state: State::Finished,
probe,
download_started_at: Instant::now(),
log_dl_msg_dbg,
download_span_guard,
}
}
}
pub fn from_client<C, I, L>(
client: ClientRetryWrapper<C>,
location: C::Location,
part_requests: I,
probe: P,
log_dl_msg_dbg: L,
download_span_guard: DownloadSpanGuard,
) -> Self
where
I: Iterator<Item = PartRequest> + Send + 'static,
L: Into<LogDownloadMessagesAsDebug>,
C: CondowClient,
{
let get_part_stream = {
let probe = probe.clone();
move |range: InclusiveRange| {
client
.download(location.clone(), range, probe.clone())
.boxed()
}
};
Self::new(
get_part_stream,
part_requests,
probe,
log_dl_msg_dbg,
download_span_guard,
)
}
}
impl<P> Stream for DownloadPartsSeq<P>
where
P: Probe + Clone,
{
type Item = ChunkStreamItem;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
let state = std::mem::replace(this.state, State::Finished);
match state {
State::Streaming(mut part_stream) => {
match part_stream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(chunk))) => {
*this.state = State::Streaming(part_stream);
Poll::Ready(Some(Ok(chunk)))
}
Poll::Ready(Some(Err(err))) => {
this.probe
.download_failed(Some(this.download_started_at.elapsed()));
this.log_dl_msg_dbg.log(format!("download failed: {err}"));
*this.state = State::Finished;
Poll::Ready(Some(Err(err)))
}
Poll::Ready(None) => {
if let Some(part_request) = this.part_requests.next() {
let stream = PartChunksStream::new(
this.get_part_stream,
part_request,
this.probe.clone(),
this.download_span_guard.span(),
);
*this.state = State::Streaming(stream);
cx.waker().wake_by_ref(); Poll::Pending
} else {
this.probe
.download_completed(this.download_started_at.elapsed());
this.log_dl_msg_dbg.log("download completed");
*this.state = State::Finished;
Poll::Ready(None)
}
}
Poll::Pending => {
*this.state = State::Streaming(part_stream);
Poll::Pending
}
}
}
State::Finished => Poll::Ready(None),
}
}
}
#[cfg(test)]
mod tests {
use futures::StreamExt;
use crate::{
condow_client::{
failing_client_simulator::FailingClientSimulatorBuilder, IgnoreLocation,
},
errors::{CondowError, CondowErrorKind},
machinery::part_request::PartRequestIterator,
retry::ClientRetryWrapper,
streams::BytesHint,
test_utils::TestCondowClient,
ChunkStream,
};
use super::DownloadPartsSeq;
#[tokio::test]
async fn get_ranges() {
let client = ClientRetryWrapper::new(TestCondowClient::new().max_jitter_ms(5), None);
for part_size in 1..100 {
let part_requests = PartRequestIterator::new(0..=99, part_size);
let poll_parts = DownloadPartsSeq::from_client(
client.clone(),
IgnoreLocation,
part_requests,
(),
true,
Default::default(),
);
let result = ChunkStream::from_stream(poll_parts.boxed(), BytesHint::new_no_hint())
.into_vec()
.await
.unwrap();
let expected = &client.inner_client().data_slice()[0..=99];
assert_eq!(result, expected, "part_size: {part_size}");
}
}
#[tokio::test]
async fn failures_with_retries() {
let blob = (0u32..=999).map(|x| x as u8).collect::<Vec<_>>();
let client = FailingClientSimulatorBuilder::default()
.blob(blob.clone())
.chunk_size(7)
.responses()
.success()
.failure(CondowErrorKind::Io)
.success()
.success_with_stream_failure(3)
.success()
.failures([CondowErrorKind::Io, CondowErrorKind::Remote])
.success_with_stream_failure(6)
.failure(CondowError::new_remote("this did not work"))
.success_with_stream_failure(2)
.finish();
let client = ClientRetryWrapper::new(client, Some(Default::default()));
let part_requests = PartRequestIterator::new(0..=999, 13);
let poll_parts = DownloadPartsSeq::from_client(
client.clone(),
IgnoreLocation,
part_requests,
(),
true,
Default::default(),
);
let result = ChunkStream::from_stream(poll_parts.boxed(), BytesHint::new_no_hint())
.into_vec()
.await
.unwrap();
let expected = blob;
assert_eq!(result, expected);
}
#[tokio::test]
async fn pending_on_request() {
let client = TestCondowClient::new().pending_on_request_n_times(1);
let blob = client.data_slice().to_vec();
let client = ClientRetryWrapper::new(client, Default::default());
let part_requests = PartRequestIterator::new(..=(blob.len() as u64 - 1), 13);
let poll_parts = DownloadPartsSeq::from_client(
client.clone(),
IgnoreLocation,
part_requests,
(),
true,
Default::default(),
);
let result = ChunkStream::from_stream(poll_parts.boxed(), BytesHint::new_no_hint())
.into_vec()
.await
.unwrap();
let expected = blob;
assert_eq!(result, expected);
}
#[tokio::test]
async fn pending_on_stream() {
let client = TestCondowClient::new().pending_on_stream_n_times(1);
let blob = client.data_slice().to_vec();
let client = ClientRetryWrapper::new(client, Default::default());
let part_requests = PartRequestIterator::new(..=(blob.len() as u64 - 1), 13);
let poll_parts = DownloadPartsSeq::from_client(
client.clone(),
IgnoreLocation,
part_requests,
(),
true,
Default::default(),
);
let result = ChunkStream::from_stream(poll_parts.boxed(), BytesHint::new_no_hint())
.into_vec()
.await
.unwrap();
let expected = blob;
assert_eq!(result, expected);
}
#[tokio::test]
async fn pending_on_request_and_stream() {
let client = TestCondowClient::new()
.pending_on_request_n_times(1)
.pending_on_stream_n_times(1);
let blob = client.data_slice().to_vec();
let client = ClientRetryWrapper::new(client, Default::default());
let part_requests = PartRequestIterator::new(..=(blob.len() as u64 - 1), 13);
let poll_parts = DownloadPartsSeq::from_client(
client.clone(),
IgnoreLocation,
part_requests,
(),
true,
Default::default(),
);
let result = ChunkStream::from_stream(poll_parts.boxed(), BytesHint::new_no_hint())
.into_vec()
.await
.unwrap();
let expected = blob;
assert_eq!(result, expected);
}
}
}