swiftide_core/
indexing_stream.rs

1#![allow(clippy::from_over_into)]
2
3//! This module defines the `IndexingStream` type, which is used internally by a pipeline  for handling asynchronous streams of `Node` items in the indexing pipeline.
4
5use crate::node::Node;
6use anyhow::Result;
7use futures_util::stream::{self, Stream};
8use std::pin::Pin;
9use tokio::sync::mpsc::Receiver;
10
11pub use futures_util::StreamExt;
12
13// We need to inform the compiler that `inner` is pinned as well
14/// An asynchronous stream of `Node` items.
15///
16/// Wraps an internal stream of `Result<Node>` items.
17///
18/// Streams, iterators and vectors of `Result<Node>` can be converted into an `IndexingStream`.
19#[pin_project::pin_project]
20pub struct IndexingStream {
21    #[pin]
22    pub(crate) inner: Pin<Box<dyn Stream<Item = Result<Node>> + Send>>,
23}
24
25impl Stream for IndexingStream {
26    type Item = Result<Node>;
27
28    fn poll_next(
29        self: Pin<&mut Self>,
30        cx: &mut std::task::Context<'_>,
31    ) -> std::task::Poll<Option<Self::Item>> {
32        let this = self.project();
33        this.inner.poll_next(cx)
34    }
35}
36
37impl Into<IndexingStream> for Vec<Result<Node>> {
38    fn into(self) -> IndexingStream {
39        IndexingStream::iter(self)
40    }
41}
42
43impl Into<IndexingStream> for Vec<Node> {
44    fn into(self) -> IndexingStream {
45        IndexingStream::from_nodes(self)
46    }
47}
48
49impl Into<IndexingStream> for anyhow::Error {
50    fn into(self) -> IndexingStream {
51        IndexingStream::iter(vec![Err(self)])
52    }
53}
54
55impl Into<IndexingStream> for Result<Vec<Node>> {
56    fn into(self) -> IndexingStream {
57        match self {
58            Ok(nodes) => IndexingStream::iter(nodes.into_iter().map(Ok)),
59            Err(err) => IndexingStream::iter(vec![Err(err)]),
60        }
61    }
62}
63
64impl Into<IndexingStream> for Pin<Box<dyn Stream<Item = Result<Node>> + Send>> {
65    fn into(self) -> IndexingStream {
66        IndexingStream { inner: self }
67    }
68}
69
70impl Into<IndexingStream> for Receiver<Result<Node>> {
71    fn into(self) -> IndexingStream {
72        IndexingStream {
73            inner: tokio_stream::wrappers::ReceiverStream::new(self).boxed(),
74        }
75    }
76}
77
78impl IndexingStream {
79    pub fn empty() -> Self {
80        IndexingStream {
81            inner: stream::empty().boxed(),
82        }
83    }
84
85    /// Creates an `IndexingStream` from an iterator of `Result<Node>`.
86    ///
87    /// WARN: Also works with Err items directly, which will result
88    /// in an _incorrect_ stream
89    pub fn iter<I>(iter: I) -> Self
90    where
91        I: IntoIterator<Item = Result<Node>> + Send + 'static,
92        <I as IntoIterator>::IntoIter: Send,
93    {
94        IndexingStream {
95            inner: stream::iter(iter).boxed(),
96        }
97    }
98
99    pub fn from_nodes(nodes: Vec<Node>) -> Self {
100        IndexingStream::iter(nodes.into_iter().map(Ok))
101    }
102}