swiftide_core/
indexing_stream.rs

1#![allow(clippy::from_over_into)]
2
3//! This module defines the `IndexingStream` type, which is used internally by a pipeline  for handling asynchronous streams of `Node` items in the indexing pipeline.
4
5use crate::node::Node;
6use anyhow::Result;
7use futures_util::stream::{self, Stream};
8use std::pin::Pin;
9use tokio::sync::mpsc::Receiver;
10
11pub use futures_util::StreamExt;
12
13// We need to inform the compiler that `inner` is pinned as well
14/// An asynchronous stream of `Node` items.
15///
16/// Wraps an internal stream of `Result<Node>` items.
17///
18/// Streams, iterators and vectors of `Result<Node>` can be converted into an `IndexingStream`.
19#[pin_project::pin_project]
20pub struct IndexingStream {
21    #[pin]
22    pub(crate) inner: Pin<Box<dyn Stream<Item = Result<Node>> + Send>>,
23}
24
25impl Stream for IndexingStream {
26    type Item = Result<Node>;
27
28    fn poll_next(
29        self: Pin<&mut Self>,
30        cx: &mut std::task::Context<'_>,
31    ) -> std::task::Poll<Option<Self::Item>> {
32        let this = self.project();
33        this.inner.poll_next(cx)
34    }
35}
36
37impl Into<IndexingStream> for Vec<Result<Node>> {
38    fn into(self) -> IndexingStream {
39        IndexingStream::iter(self)
40    }
41}
42
43impl Into<IndexingStream> for Vec<Node> {
44    fn into(self) -> IndexingStream {
45        IndexingStream::from_nodes(self)
46    }
47}
48
49// impl Into<IndexingStream> for anyhow::Error {
50//     fn into(self) -> IndexingStream {
51//         IndexingStream::iter(vec![Err(self)])
52//     }
53// }
54
55impl Into<IndexingStream> for Result<Vec<Node>> {
56    fn into(self) -> IndexingStream {
57        match self {
58            Ok(nodes) => IndexingStream::iter(nodes.into_iter().map(Ok)),
59            Err(err) => IndexingStream::iter(vec![Err(err)]),
60        }
61    }
62}
63
64impl Into<IndexingStream> for Pin<Box<dyn Stream<Item = Result<Node>> + Send>> {
65    fn into(self) -> IndexingStream {
66        IndexingStream { inner: self }
67    }
68}
69
70impl Into<IndexingStream> for Receiver<Result<Node>> {
71    fn into(self) -> IndexingStream {
72        IndexingStream {
73            inner: tokio_stream::wrappers::ReceiverStream::new(self).boxed(),
74        }
75    }
76}
77
78impl From<anyhow::Error> for IndexingStream {
79    fn from(err: anyhow::Error) -> Self {
80        IndexingStream::iter(vec![Err(err)])
81    }
82}
83
84impl IndexingStream {
85    pub fn empty() -> Self {
86        IndexingStream {
87            inner: stream::empty().boxed(),
88        }
89    }
90
91    /// Creates an `IndexingStream` from an iterator of `Result<Node>`.
92    ///
93    /// WARN: Also works with Err items directly, which will result
94    /// in an _incorrect_ stream
95    pub fn iter<I>(iter: I) -> Self
96    where
97        I: IntoIterator<Item = Result<Node>> + Send + 'static,
98        <I as IntoIterator>::IntoIter: Send,
99    {
100        IndexingStream {
101            inner: stream::iter(iter).boxed(),
102        }
103    }
104
105    pub fn from_nodes(nodes: Vec<Node>) -> Self {
106        IndexingStream::iter(nodes.into_iter().map(Ok))
107    }
108}