use std::sync::Arc;
use std::sync::atomic::Ordering;
use bytes::Bytes;
use tokio::sync::Notify;
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::info;
use super::super::error::Result;
use super::super::file_reconstructor::FileReconstructor;
use super::super::run_state::RunState;
use super::unordered_writer::{CompletedTerm, UnorderedWriterProgress};
pub struct UnorderedDownloadStream {
progress: Arc<UnorderedWriterProgress>,
receiver: UnboundedReceiver<Result<CompletedTerm>>,
finished: bool,
run_state: Arc<RunState>,
start_signal: Option<Arc<Notify>>,
}
impl UnorderedDownloadStream {
pub(crate) fn new(reconstructor: FileReconstructor, run_state: Arc<RunState>) -> Self {
use super::unordered_writer::UnorderedWriter;
let (writer, receiver, progress) = UnorderedWriter::new_streaming(run_state.clone());
let start_signal = Arc::new(Notify::new());
let signal = start_signal.clone();
let rs = run_state.clone();
tokio::spawn(async move {
signal.notified().await;
info!(file_hash = %rs.file_hash(), "Starting unordered download stream");
let _ = reconstructor.run(writer, rs, true).await;
});
Self {
progress,
receiver,
finished: false,
run_state,
start_signal: Some(start_signal),
}
}
pub(crate) fn abort_callback(&self) -> Box<dyn Fn() + Send + Sync> {
let run_state = self.run_state.clone();
let start_signal = self.start_signal.clone();
Box::new(move || {
run_state.cancel();
if let Some(signal) = start_signal.as_ref() {
signal.notify_one();
}
})
}
pub fn start(&mut self) {
if let Some(signal) = self.start_signal.take() {
signal.notify_one();
}
}
fn ensure_started(&mut self) {
if self.start_signal.is_some() {
self.start();
}
}
fn cancel_reconstruction(&self) {
self.run_state.cancel();
if let Some(signal) = self.start_signal.as_ref() {
signal.notify_one();
}
}
pub fn blocking_next(&mut self) -> Result<Option<(u64, Bytes)>> {
if self.finished {
return Ok(None);
}
self.ensure_started();
match self.receiver.blocking_recv() {
Some(result) => self.process_term(result),
None => {
self.finished = true;
self.run_state.check_error()?;
Ok(None)
},
}
}
pub async fn next(&mut self) -> Result<Option<(u64, Bytes)>> {
if self.finished {
return Ok(None);
}
self.ensure_started();
if let Ok(result) = self.receiver.try_recv() {
return self.process_term(result);
}
let next_item = tokio::select! {
biased;
recv = self.receiver.recv() => recv,
_ = self.run_state.cancelled() => None,
};
match next_item {
Some(result) => self.process_term(result),
None => {
self.finished = true;
self.run_state.check_error()?;
Ok(None)
},
}
}
fn process_term(&mut self, result: Result<CompletedTerm>) -> Result<Option<(u64, Bytes)>> {
let term = result?;
self.run_state.report_bytes_written(term.data.len() as u64);
let offset = term.byte_range.start;
let data = term.data;
drop(term.permit);
Ok(Some((offset, data)))
}
pub fn cancel(&mut self) {
self.cancel_reconstruction();
let _ = self.start_signal.take();
self.receiver.close();
self.finished = true;
}
pub fn total_bytes_expected(&self) -> u64 {
self.run_state
.progress_updater()
.map(|u| u.item().total_bytes.load(Ordering::Acquire))
.unwrap_or(0)
}
pub fn bytes_in_progress(&self) -> u64 {
self.progress.bytes_in_progress()
}
pub fn bytes_completed(&self) -> u64 {
self.run_state
.progress_updater()
.map(|u| u.total_bytes_completed())
.unwrap_or(0)
}
pub fn terms_in_progress(&self) -> u64 {
self.progress.terms_in_progress()
}
pub fn is_complete(&self) -> bool {
self.finished
}
}
impl Drop for UnorderedDownloadStream {
fn drop(&mut self) {
self.cancel_reconstruction();
self.receiver.close();
}
}