use crate::{
StreamData,
endpoint::CursorEndpoint,
error::Error,
reader::{ReaderNotification, StreamReader},
};
use async_std::channel::Sender;
use async_trait::async_trait;
use std::io::{Cursor, Read};
#[async_trait]
impl StreamReader for CursorEndpoint {
async fn read_stream(
&self,
_path: &str,
sender: Sender<StreamData>,
channel: &dyn ReaderNotification,
) -> Result<u64, Error> {
let mut total_read_bytes: u64 = 0;
let mut stream = Cursor::new(self.content());
let stream_length = self.content().len() as u64;
sender.send(StreamData::Size(stream_length)).await?;
loop {
if channel.is_stopped() {
sender.send(StreamData::Stop).await?;
return Ok(total_read_bytes);
}
let mut buffer = vec![0; 30 * 1024];
let read_size = stream.read(&mut buffer)?;
total_read_bytes += read_size as u64;
if read_size == 0 {
sender.send(StreamData::Eof).await?;
return Ok(total_read_bytes);
}
if let Err(error) = sender
.send(StreamData::Data(buffer[0..read_size].to_vec()))
.await
{
if channel.is_stopped() && sender.is_closed() {
log::warn!("Data channel closed: could not send {read_size} read bytes.");
return Ok(total_read_bytes);
}
return Err(error.into());
}
}
}
}