swiftide_core/
indexing_stream.rs1#![allow(clippy::from_over_into)]
2
3use crate::node::Node;
6use anyhow::Result;
7use futures_util::stream::{self, Stream};
8use std::pin::Pin;
9use tokio::sync::mpsc::Receiver;
10
11pub use futures_util::StreamExt;
12
13#[pin_project::pin_project]
20pub struct IndexingStream {
21 #[pin]
22 pub(crate) inner: Pin<Box<dyn Stream<Item = Result<Node>> + Send>>,
23}
24
25impl Stream for IndexingStream {
26 type Item = Result<Node>;
27
28 fn poll_next(
29 self: Pin<&mut Self>,
30 cx: &mut std::task::Context<'_>,
31 ) -> std::task::Poll<Option<Self::Item>> {
32 let this = self.project();
33 this.inner.poll_next(cx)
34 }
35}
36
37impl Into<IndexingStream> for Vec<Result<Node>> {
38 fn into(self) -> IndexingStream {
39 IndexingStream::iter(self)
40 }
41}
42
43impl Into<IndexingStream> for Vec<Node> {
44 fn into(self) -> IndexingStream {
45 IndexingStream::from_nodes(self)
46 }
47}
48
49impl Into<IndexingStream> for anyhow::Error {
50 fn into(self) -> IndexingStream {
51 IndexingStream::iter(vec![Err(self)])
52 }
53}
54
55impl Into<IndexingStream> for Result<Vec<Node>> {
56 fn into(self) -> IndexingStream {
57 match self {
58 Ok(nodes) => IndexingStream::iter(nodes.into_iter().map(Ok)),
59 Err(err) => IndexingStream::iter(vec![Err(err)]),
60 }
61 }
62}
63
64impl Into<IndexingStream> for Pin<Box<dyn Stream<Item = Result<Node>> + Send>> {
65 fn into(self) -> IndexingStream {
66 IndexingStream { inner: self }
67 }
68}
69
70impl Into<IndexingStream> for Receiver<Result<Node>> {
71 fn into(self) -> IndexingStream {
72 IndexingStream {
73 inner: tokio_stream::wrappers::ReceiverStream::new(self).boxed(),
74 }
75 }
76}
77
78impl IndexingStream {
79 pub fn empty() -> Self {
80 IndexingStream {
81 inner: stream::empty().boxed(),
82 }
83 }
84
85 pub fn iter<I>(iter: I) -> Self
90 where
91 I: IntoIterator<Item = Result<Node>> + Send + 'static,
92 <I as IntoIterator>::IntoIter: Send,
93 {
94 IndexingStream {
95 inner: stream::iter(iter).boxed(),
96 }
97 }
98
99 pub fn from_nodes(nodes: Vec<Node>) -> Self {
100 IndexingStream::iter(nodes.into_iter().map(Ok))
101 }
102}