use std::sync::Arc;
use bytes::Bytes;
use tracing::{debug, info};
use xet_data::DataError;
use xet_data::processing::{DownloadStream, FileDownloadSession, UnorderedDownloadStream};
use xet_data::progress_tracking::{ItemProgressReport, UniqueID};
use super::errors::SessionError;
use super::task_runtime::TaskRuntime;
pub struct XetDownloadStream {
inner: DownloadStream,
download_session: Arc<FileDownloadSession>,
id: UniqueID,
task_runtime: Arc<TaskRuntime>,
}
impl XetDownloadStream {
pub(super) fn new(
inner: DownloadStream,
download_session: Arc<FileDownloadSession>,
id: UniqueID,
task_runtime: Arc<TaskRuntime>,
) -> Self {
Self {
inner,
download_session,
id,
task_runtime,
}
}
pub fn start(&mut self) {
info!(stream_id = %self.id, "Download stream start");
self.inner.start();
}
pub async fn next(&mut self) -> Result<Option<Bytes>, SessionError> {
debug!(stream_id = %self.id, "Download stream next");
self.inner.next().await.map_err(|e| SessionError::from(DataError::from(e)))
}
pub fn blocking_next(&mut self) -> Result<Option<Bytes>, SessionError> {
debug!(stream_id = %self.id, "Download stream next");
self.inner.blocking_next().map_err(|e| SessionError::from(DataError::from(e)))
}
pub fn cancel(&mut self) {
info!(stream_id = %self.id, "Download stream cancel");
let _ = self.task_runtime.cancel_subtree();
self.inner.cancel();
}
pub fn progress(&self) -> ItemProgressReport {
self.download_session
.item_report(self.id)
.expect("progress item was registered at stream creation and is never removed")
}
}
impl Drop for XetDownloadStream {
fn drop(&mut self) {
self.download_session.unregister_stream_abort_callback(self.id);
}
}
pub struct XetUnorderedDownloadStream {
inner: UnorderedDownloadStream,
download_session: Arc<FileDownloadSession>,
id: UniqueID,
task_runtime: Arc<TaskRuntime>,
}
impl XetUnorderedDownloadStream {
pub(super) fn new(
inner: UnorderedDownloadStream,
download_session: Arc<FileDownloadSession>,
id: UniqueID,
task_runtime: Arc<TaskRuntime>,
) -> Self {
Self {
inner,
download_session,
id,
task_runtime,
}
}
pub fn start(&mut self) {
info!(stream_id = %self.id, "Download stream start");
self.inner.start();
}
pub async fn next(&mut self) -> Result<Option<(u64, Bytes)>, SessionError> {
debug!(stream_id = %self.id, "Download stream next");
self.inner.next().await.map_err(|e| SessionError::from(DataError::from(e)))
}
pub fn blocking_next(&mut self) -> Result<Option<(u64, Bytes)>, SessionError> {
debug!(stream_id = %self.id, "Download stream next");
self.inner.blocking_next().map_err(|e| SessionError::from(DataError::from(e)))
}
pub fn cancel(&mut self) {
info!(stream_id = %self.id, "Download stream cancel");
let _ = self.task_runtime.cancel_subtree();
self.inner.cancel();
}
pub fn progress(&self) -> ItemProgressReport {
self.download_session
.item_report(self.id)
.expect("progress item was registered at stream creation and is never removed")
}
}
impl Drop for XetUnorderedDownloadStream {
fn drop(&mut self) {
self.download_session.unregister_stream_abort_callback(self.id);
}
}