use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::SystemTime;
use bytes::Bytes;
use futures::Stream;
use pin_project_lite::pin_project;
use tokio::sync::broadcast;
use crate::TransferEvent;
pub struct ByteStream {
bytes: Bytes,
offset: usize,
}
impl ByteStream {
pub fn new(bytes: Bytes) -> Self {
Self { bytes, offset: 0 }
}
}
impl Stream for ByteStream {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
const CHUNK_SIZE: usize = 64 * 1024;
if self.offset == self.bytes.len() {
return Poll::Ready(None);
}
let bytes = self
.bytes
.slice(self.offset..(self.offset + CHUNK_SIZE).min(self.bytes.len()));
self.offset += bytes.len();
Poll::Ready(Some(Ok(bytes)))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(self.bytes.len()))
}
}
pin_project! {
pub struct TransferStream<S> {
#[pin]
stream: S,
id: u64,
block: u64,
transferred: u64,
last: Option<SystemTime>,
events: Option<broadcast::Sender<TransferEvent>>,
finished: bool,
}
impl<S> PinnedDrop for TransferStream<S> {
fn drop(this: Pin<&mut Self>) {
if let Some(events) = &this.events {
events
.send(TransferEvent::BlockProgress {
id: this.id,
block: this.block,
transferred: this.transferred,
})
.ok();
}
}
}
}
impl<S> TransferStream<S> {
pub fn new(
stream: S,
id: u64,
block: u64,
offset: u64,
events: Option<broadcast::Sender<TransferEvent>>,
) -> Self
where
S: Stream<Item = std::io::Result<Bytes>>,
{
Self {
stream,
id,
block,
transferred: offset,
last: None,
events,
finished: false,
}
}
}
impl<S> Stream for TransferStream<S>
where
S: Stream<Item = std::io::Result<Bytes>>,
{
type Item = std::io::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
const UPDATE_INTERVAL: Duration = Duration::from_millis(50);
if self.finished {
return Poll::Ready(None);
}
let this = self.project();
match this.stream.poll_next(cx) {
Poll::Ready(Some(Ok(bytes))) => {
let now = SystemTime::now();
let update = this
.last
.map(|last| {
now.duration_since(last).unwrap_or(Duration::ZERO) >= UPDATE_INTERVAL
})
.unwrap_or(true);
*this.transferred += u64::try_from(bytes.len()).unwrap();
if update && let Some(events) = &this.events {
*this.last = Some(now);
events
.send(TransferEvent::BlockProgress {
id: *this.id,
block: *this.block,
transferred: *this.transferred,
})
.ok();
}
Poll::Ready(Some(Ok(bytes)))
}
Poll::Ready(Some(Err(e))) => {
*this.finished = true;
Poll::Ready(Some(Err(e)))
}
Poll::Ready(None) => {
*this.finished = true;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}