use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::Notify;
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::info;
use super::super::error::{FileReconstructionError, Result};
use super::super::file_reconstructor::FileReconstructor;
use super::super::run_state::RunState;
use super::sequential_writer::{SequentialRetrievalItem, SequentialWriter};
pub struct DownloadStream {
receiver: UnboundedReceiver<SequentialRetrievalItem>,
finished: bool,
run_state: Arc<RunState>,
start_signal: Option<Arc<Notify>>,
}
impl DownloadStream {
pub(crate) fn new(reconstructor: FileReconstructor, run_state: Arc<RunState>) -> Self {
let (data_writer, receiver) = SequentialWriter::new_streaming(run_state.clone());
let start_signal = Arc::new(Notify::new());
let signal = start_signal.clone();
let rs = run_state.clone();
tokio::spawn(async move {
signal.notified().await;
info!(file_hash = %rs.file_hash(), "Starting download stream");
let _ = reconstructor.run(data_writer, rs, true).await;
});
Self {
receiver,
finished: false,
run_state,
start_signal: Some(start_signal),
}
}
pub(crate) fn abort_callback(&self) -> Box<dyn Fn() + Send + Sync> {
let run_state = self.run_state.clone();
let start_signal = self.start_signal.clone();
Box::new(move || {
run_state.cancel();
if let Some(signal) = start_signal.as_ref() {
signal.notify_one();
}
})
}
pub fn start(&mut self) {
if let Some(signal) = self.start_signal.take() {
signal.notify_one();
}
}
fn ensure_started(&mut self) {
if self.start_signal.is_some() {
self.start();
}
}
fn cancel_reconstruction(&self) {
self.run_state.cancel();
if let Some(signal) = self.start_signal.as_ref() {
signal.notify_one();
}
}
pub fn blocking_next(&mut self) -> Result<Option<Bytes>> {
if self.finished {
return Ok(None);
}
self.ensure_started();
match self.receiver.blocking_recv() {
Some(SequentialRetrievalItem::Data { receiver, permit }) => {
let data = match receiver.blocking_recv() {
Ok(data) => data,
Err(_) => {
self.run_state.check_error()?;
return Err(FileReconstructionError::InternalWriterError(
"Data sender was dropped before sending data.".to_string(),
));
},
};
self.run_state.report_bytes_written(data.len() as u64);
drop(permit);
Ok(Some(data))
},
Some(SequentialRetrievalItem::Finish) | None => {
self.finished = true;
self.run_state.check_error()?;
Ok(None)
},
}
}
pub async fn next(&mut self) -> Result<Option<Bytes>> {
if self.finished {
return Ok(None);
}
self.ensure_started();
let item = if let Ok(item) = self.receiver.try_recv() {
Some(item)
} else {
tokio::select! {
biased;
recv = self.receiver.recv() => recv,
_ = self.run_state.cancelled() => None,
}
};
match item {
Some(SequentialRetrievalItem::Data { receiver, permit }) => {
let data = match receiver.await {
Ok(data) => data,
Err(_) => {
self.run_state.check_error()?;
return Err(FileReconstructionError::InternalWriterError(
"Data sender was dropped before sending data.".to_string(),
));
},
};
self.run_state.report_bytes_written(data.len() as u64);
drop(permit);
Ok(Some(data))
},
Some(SequentialRetrievalItem::Finish) | None => {
self.finished = true;
self.run_state.check_error()?;
Ok(None)
},
}
}
pub fn cancel(&mut self) {
self.cancel_reconstruction();
let _ = self.start_signal.take();
self.receiver.close();
self.finished = true;
}
}
impl Drop for DownloadStream {
fn drop(&mut self) {
self.cancel_reconstruction();
self.receiver.close();
}
}