use std::{
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
use futures::StreamExt;
use tokio::sync::mpsc::UnboundedSender;
use crate::{
condow_client::CondowClient,
config::Config,
errors::CondowError,
machinery::{part_request::PartRequestIterator, DownloadSpanGuard},
probe::Probe,
retry::ClientRetryWrapper,
streams::ChunkStreamItem,
};
use worker::{DownloaderContext, SequentialDownloader};
mod worker;
pub(crate) struct ParallelDownloader<P: Probe + Clone> {
downloaders: Vec<SequentialDownloader>,
counter: usize,
kill_switch: KillSwitch,
config: Arc<Config>,
probe: P,
results_sender: UnboundedSender<ChunkStreamItem>,
}
impl<P: Probe + Clone> ParallelDownloader<P> {
pub fn new<C: CondowClient>(
results_sender: UnboundedSender<ChunkStreamItem>,
client: ClientRetryWrapper<C>,
config: Config,
location: C::Location,
probe: P,
download_span_guard: DownloadSpanGuard,
) -> Self {
let started_at = Instant::now();
let kill_switch = KillSwitch::new();
let counter = Arc::new(AtomicUsize::new(0));
let config = Arc::new(config);
let downloaders: Vec<_> = (0..*config.max_concurrency)
.map(|_| {
SequentialDownloader::new(
client.clone(),
location.clone(),
config.buffer_size.into(),
DownloaderContext::new(
results_sender.clone(),
Arc::clone(&counter),
kill_switch.clone(),
probe.clone(),
started_at,
download_span_guard.clone(),
Arc::clone(&config),
),
)
})
.collect();
Self {
downloaders,
counter: 0,
kill_switch,
config,
probe,
results_sender,
}
}
pub async fn download(&mut self, ranges: PartRequestIterator) {
let mut ranges_stream = ranges.into_stream();
let max_buffers_full_delay: Duration = self.config.max_buffers_full_delay_ms.into();
let n_downloaders = self.downloaders.len();
while let Some(mut range_request) = ranges_stream.next().await {
let mut attempt = 1;
let mut current_buffers_full_delay = Duration::from_secs(0);
loop {
if attempt % self.downloaders.len() == 0 {
self.probe.queue_full();
tokio::time::sleep(current_buffers_full_delay).await;
current_buffers_full_delay = max_buffers_full_delay
.min(current_buffers_full_delay + Duration::from_millis(1));
}
let idx = self.counter + attempt;
let downloader = &mut self.downloaders[idx % n_downloaders];
match downloader.enqueue(range_request) {
Ok(None) => break,
Ok(Some(msg)) => {
range_request = msg; }
Err(()) => {
self.kill_switch.push_the_button();
let _ = self.results_sender.send(Err(CondowError::new_other(
"failed to send part request - receiver gone",
)));
return;
}
}
attempt += 1;
}
self.counter += 1;
}
}
}
#[derive(Clone)]
pub(crate) struct KillSwitch {
is_pushed: Arc<AtomicBool>,
}
impl KillSwitch {
pub fn new() -> Self {
Self {
is_pushed: Arc::new(AtomicBool::new(false)),
}
}
pub fn is_pushed(&self) -> bool {
self.is_pushed.load(Ordering::SeqCst)
}
pub fn push_the_button(&self) {
self.is_pushed.store(true, Ordering::SeqCst)
}
}