Skip to main content

zerofs_client/
stream.rs

1//! `stream` feature: a [`futures_core::Stream`] adapter over [`Dir`] listing,
2//! so a directory can be consumed with `StreamExt` (`.next()`, `.collect()`,
3//! `try_for_each`, ...). Rust-only; never crosses the FFI boundary.
4
5use crate::dir::Dir;
6use crate::error::ZeroFsError;
7use crate::types::DirEntry;
8use futures_core::Stream;
9use std::collections::VecDeque;
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll, ready};
14
15type BoxFut<T> = Pin<Box<dyn Future<Output = T> + Send>>;
16
17/// A pull-based stream of [`DirEntry`] over an open [`Dir`], yielding entries
18/// one at a time (fetched a server batch at a time underneath). It holds the
19/// `Dir` open for its lifetime; an error ends the stream after it is yielded.
20pub struct DirStream {
21    dir: Arc<Dir>,
22    buf: VecDeque<DirEntry>,
23    fetch: Option<BoxFut<Result<Vec<DirEntry>, ZeroFsError>>>,
24    done: bool,
25}
26
27impl DirStream {
28    pub(crate) fn new(dir: Arc<Dir>) -> Self {
29        Self {
30            dir,
31            buf: VecDeque::new(),
32            fetch: None,
33            done: false,
34        }
35    }
36}
37
38impl Stream for DirStream {
39    type Item = Result<DirEntry, ZeroFsError>;
40
41    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        loop {
43            if let Some(entry) = self.buf.pop_front() {
44                return Poll::Ready(Some(Ok(entry)));
45            }
46            if self.done {
47                return Poll::Ready(None);
48            }
49            if self.fetch.is_none() {
50                let dir = Arc::clone(&self.dir);
51                self.fetch = Some(Box::pin(async move { dir.next_batch(None).await }));
52            }
53            let batch = ready!(self.fetch.as_mut().unwrap().as_mut().poll(cx));
54            self.fetch = None;
55            match batch {
56                // An empty batch is end-of-directory.
57                Ok(batch) if batch.is_empty() => {
58                    self.done = true;
59                    return Poll::Ready(None);
60                }
61                Ok(batch) => self.buf.extend(batch),
62                Err(e) => {
63                    self.done = true;
64                    return Poll::Ready(Some(Err(e)));
65                }
66            }
67        }
68    }
69}