swiftide_core/
indexing_stream.rs1#![allow(clippy::from_over_into)]
2
3use crate::node::Node;
6use anyhow::Result;
7use futures_util::stream::{self, Stream};
8use std::pin::Pin;
9use tokio::sync::mpsc::Receiver;
10
11pub use futures_util::StreamExt;
12
13#[pin_project::pin_project]
20pub struct IndexingStream {
21 #[pin]
22 pub(crate) inner: Pin<Box<dyn Stream<Item = Result<Node>> + Send>>,
23}
24
25impl Stream for IndexingStream {
26 type Item = Result<Node>;
27
28 fn poll_next(
29 self: Pin<&mut Self>,
30 cx: &mut std::task::Context<'_>,
31 ) -> std::task::Poll<Option<Self::Item>> {
32 let this = self.project();
33 this.inner.poll_next(cx)
34 }
35}
36
37impl Into<IndexingStream> for Vec<Result<Node>> {
38 fn into(self) -> IndexingStream {
39 IndexingStream::iter(self)
40 }
41}
42
43impl Into<IndexingStream> for Vec<Node> {
44 fn into(self) -> IndexingStream {
45 IndexingStream::from_nodes(self)
46 }
47}
48
49impl Into<IndexingStream> for Result<Vec<Node>> {
56 fn into(self) -> IndexingStream {
57 match self {
58 Ok(nodes) => IndexingStream::iter(nodes.into_iter().map(Ok)),
59 Err(err) => IndexingStream::iter(vec![Err(err)]),
60 }
61 }
62}
63
64impl Into<IndexingStream> for Pin<Box<dyn Stream<Item = Result<Node>> + Send>> {
65 fn into(self) -> IndexingStream {
66 IndexingStream { inner: self }
67 }
68}
69
70impl Into<IndexingStream> for Receiver<Result<Node>> {
71 fn into(self) -> IndexingStream {
72 IndexingStream {
73 inner: tokio_stream::wrappers::ReceiverStream::new(self).boxed(),
74 }
75 }
76}
77
78impl From<anyhow::Error> for IndexingStream {
79 fn from(err: anyhow::Error) -> Self {
80 IndexingStream::iter(vec![Err(err)])
81 }
82}
83
84impl IndexingStream {
85 pub fn empty() -> Self {
86 IndexingStream {
87 inner: stream::empty().boxed(),
88 }
89 }
90
91 pub fn iter<I>(iter: I) -> Self
96 where
97 I: IntoIterator<Item = Result<Node>> + Send + 'static,
98 <I as IntoIterator>::IntoIter: Send,
99 {
100 IndexingStream {
101 inner: stream::iter(iter).boxed(),
102 }
103 }
104
105 pub fn from_nodes(nodes: Vec<Node>) -> Self {
106 IndexingStream::iter(nodes.into_iter().map(Ok))
107 }
108}