swiftide_core/
indexing_stream.rs

1#![allow(clippy::from_over_into)]
2
3//! This module defines the `IndexingStream` type, which is used internally by a pipeline  for
4//! handling asynchronous streams of `Node` items in the indexing pipeline.
5
6use crate::node::Node;
7use anyhow::Result;
8use futures_util::stream::{self, Stream};
9use std::pin::Pin;
10use tokio::sync::mpsc::Receiver;
11
12pub use futures_util::StreamExt;
13
14// We need to inform the compiler that `inner` is pinned as well
15/// An asynchronous stream of `Node` items.
16///
17/// Wraps an internal stream of `Result<Node>` items.
18///
19/// Streams, iterators and vectors of `Result<Node>` can be converted into an `IndexingStream`.
20#[pin_project::pin_project]
21pub struct IndexingStream {
22    #[pin]
23    pub(crate) inner: Pin<Box<dyn Stream<Item = Result<Node>> + Send>>,
24}
25
26impl Stream for IndexingStream {
27    type Item = Result<Node>;
28
29    fn poll_next(
30        self: Pin<&mut Self>,
31        cx: &mut std::task::Context<'_>,
32    ) -> std::task::Poll<Option<Self::Item>> {
33        let this = self.project();
34        this.inner.poll_next(cx)
35    }
36}
37
38impl Into<IndexingStream> for Vec<Result<Node>> {
39    fn into(self) -> IndexingStream {
40        IndexingStream::iter(self)
41    }
42}
43
44impl Into<IndexingStream> for Vec<Node> {
45    fn into(self) -> IndexingStream {
46        IndexingStream::from_nodes(self)
47    }
48}
49
50// impl Into<IndexingStream> for anyhow::Error {
51//     fn into(self) -> IndexingStream {
52//         IndexingStream::iter(vec![Err(self)])
53//     }
54// }
55
56impl Into<IndexingStream> for Result<Vec<Node>> {
57    fn into(self) -> IndexingStream {
58        match self {
59            Ok(nodes) => IndexingStream::iter(nodes.into_iter().map(Ok)),
60            Err(err) => IndexingStream::iter(vec![Err(err)]),
61        }
62    }
63}
64
65impl Into<IndexingStream> for Pin<Box<dyn Stream<Item = Result<Node>> + Send>> {
66    fn into(self) -> IndexingStream {
67        IndexingStream { inner: self }
68    }
69}
70
71impl Into<IndexingStream> for Receiver<Result<Node>> {
72    fn into(self) -> IndexingStream {
73        IndexingStream {
74            inner: tokio_stream::wrappers::ReceiverStream::new(self).boxed(),
75        }
76    }
77}
78
79impl From<anyhow::Error> for IndexingStream {
80    fn from(err: anyhow::Error) -> Self {
81        IndexingStream::iter(vec![Err(err)])
82    }
83}
84
85impl IndexingStream {
86    pub fn empty() -> Self {
87        IndexingStream {
88            inner: stream::empty().boxed(),
89        }
90    }
91
92    /// Creates an `IndexingStream` from an iterator of `Result<Node>`.
93    ///
94    /// WARN: Also works with Err items directly, which will result
95    /// in an _incorrect_ stream
96    pub fn iter<I>(iter: I) -> Self
97    where
98        I: IntoIterator<Item = Result<Node>> + Send + 'static,
99        <I as IntoIterator>::IntoIter: Send,
100    {
101        IndexingStream {
102            inner: stream::iter(iter).boxed(),
103        }
104    }
105
106    pub fn from_nodes(nodes: Vec<Node>) -> Self {
107        IndexingStream::iter(nodes.into_iter().map(Ok))
108    }
109}