swiftide_core/
indexing_stream.rs1#![allow(clippy::from_over_into)]
2
3use crate::node::Node;
7use anyhow::Result;
8use futures_util::stream::{self, Stream};
9use std::pin::Pin;
10use tokio::sync::mpsc::Receiver;
11
12pub use futures_util::StreamExt;
13
14#[pin_project::pin_project]
21pub struct IndexingStream {
22 #[pin]
23 pub(crate) inner: Pin<Box<dyn Stream<Item = Result<Node>> + Send>>,
24}
25
26impl Stream for IndexingStream {
27 type Item = Result<Node>;
28
29 fn poll_next(
30 self: Pin<&mut Self>,
31 cx: &mut std::task::Context<'_>,
32 ) -> std::task::Poll<Option<Self::Item>> {
33 let this = self.project();
34 this.inner.poll_next(cx)
35 }
36}
37
38impl Into<IndexingStream> for Vec<Result<Node>> {
39 fn into(self) -> IndexingStream {
40 IndexingStream::iter(self)
41 }
42}
43
44impl Into<IndexingStream> for Vec<Node> {
45 fn into(self) -> IndexingStream {
46 IndexingStream::from_nodes(self)
47 }
48}
49
50impl Into<IndexingStream> for Result<Vec<Node>> {
57 fn into(self) -> IndexingStream {
58 match self {
59 Ok(nodes) => IndexingStream::iter(nodes.into_iter().map(Ok)),
60 Err(err) => IndexingStream::iter(vec![Err(err)]),
61 }
62 }
63}
64
65impl Into<IndexingStream> for Pin<Box<dyn Stream<Item = Result<Node>> + Send>> {
66 fn into(self) -> IndexingStream {
67 IndexingStream { inner: self }
68 }
69}
70
71impl Into<IndexingStream> for Receiver<Result<Node>> {
72 fn into(self) -> IndexingStream {
73 IndexingStream {
74 inner: tokio_stream::wrappers::ReceiverStream::new(self).boxed(),
75 }
76 }
77}
78
79impl From<anyhow::Error> for IndexingStream {
80 fn from(err: anyhow::Error) -> Self {
81 IndexingStream::iter(vec![Err(err)])
82 }
83}
84
85impl IndexingStream {
86 pub fn empty() -> Self {
87 IndexingStream {
88 inner: stream::empty().boxed(),
89 }
90 }
91
92 pub fn iter<I>(iter: I) -> Self
97 where
98 I: IntoIterator<Item = Result<Node>> + Send + 'static,
99 <I as IntoIterator>::IntoIter: Send,
100 {
101 IndexingStream {
102 inner: stream::iter(iter).boxed(),
103 }
104 }
105
106 pub fn from_nodes(nodes: Vec<Node>) -> Self {
107 IndexingStream::iter(nodes.into_iter().map(Ok))
108 }
109}