use crate::dir::Dir;
use crate::error::ZeroFsError;
use crate::types::DirEntry;
use futures_core::Stream;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, ready};
type BoxFut<T> = Pin<Box<dyn Future<Output = T> + Send>>;
pub struct DirStream {
dir: Arc<Dir>,
buf: VecDeque<DirEntry>,
fetch: Option<BoxFut<Result<Vec<DirEntry>, ZeroFsError>>>,
done: bool,
}
impl DirStream {
pub(crate) fn new(dir: Arc<Dir>) -> Self {
Self {
dir,
buf: VecDeque::new(),
fetch: None,
done: false,
}
}
}
impl Stream for DirStream {
type Item = Result<DirEntry, ZeroFsError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(entry) = self.buf.pop_front() {
return Poll::Ready(Some(Ok(entry)));
}
if self.done {
return Poll::Ready(None);
}
if self.fetch.is_none() {
let dir = Arc::clone(&self.dir);
self.fetch = Some(Box::pin(async move { dir.next_batch(None).await }));
}
let batch = ready!(self.fetch.as_mut().unwrap().as_mut().poll(cx));
self.fetch = None;
match batch {
Ok(batch) if batch.is_empty() => {
self.done = true;
return Poll::Ready(None);
}
Ok(batch) => self.buf.extend(batch),
Err(e) => {
self.done = true;
return Poll::Ready(Some(Err(e)));
}
}
}
}
}