1use crate::dir::Dir;
6use crate::error::ZeroFsError;
7use crate::types::DirEntry;
8use futures_core::Stream;
9use std::collections::VecDeque;
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll, ready};
14
15type BoxFut<T> = Pin<Box<dyn Future<Output = T> + Send>>;
16
17pub struct DirStream {
21 dir: Arc<Dir>,
22 buf: VecDeque<DirEntry>,
23 fetch: Option<BoxFut<Result<Vec<DirEntry>, ZeroFsError>>>,
24 done: bool,
25}
26
27impl DirStream {
28 pub(crate) fn new(dir: Arc<Dir>) -> Self {
29 Self {
30 dir,
31 buf: VecDeque::new(),
32 fetch: None,
33 done: false,
34 }
35 }
36}
37
38impl Stream for DirStream {
39 type Item = Result<DirEntry, ZeroFsError>;
40
41 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42 loop {
43 if let Some(entry) = self.buf.pop_front() {
44 return Poll::Ready(Some(Ok(entry)));
45 }
46 if self.done {
47 return Poll::Ready(None);
48 }
49 if self.fetch.is_none() {
50 let dir = Arc::clone(&self.dir);
51 self.fetch = Some(Box::pin(async move { dir.next_batch(None).await }));
52 }
53 let batch = ready!(self.fetch.as_mut().unwrap().as_mut().poll(cx));
54 self.fetch = None;
55 match batch {
56 Ok(batch) if batch.is_empty() => {
58 self.done = true;
59 return Poll::Ready(None);
60 }
61 Ok(batch) => self.buf.extend(batch),
62 Err(e) => {
63 self.done = true;
64 return Poll::Ready(Some(Err(e)));
65 }
66 }
67 }
68 }
69}