1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#![allow(clippy::from_over_into)]

//! This module defines the `IndexingStream` type, which is used internally by a pipeline  for handling asynchronous streams of `Node` items in the indexing pipeline.

use crate::node::Node;
use anyhow::Result;
use futures_util::stream::{self, Stream};
use pin_project_lite::pin_project;
use std::pin::Pin;
use tokio::sync::mpsc::Receiver;

pub use futures_util::StreamExt;

// We need to inform the compiler that `inner` is pinned as well
pin_project! {
    /// An asynchronous stream of `Node` items.
    ///
    /// Wraps an internal stream of `Result<Node>` items.
    ///
    /// Streams, iterators and vectors of `Result<Node>` can be converted into an `IndexingStream`.
    pub struct IndexingStream {
        #[pin]
        pub(crate) inner: Pin<Box<dyn Stream<Item = Result<Node>> + Send>>,
    }
}

impl Stream for IndexingStream {
    type Item = Result<Node>;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let this = self.project();
        this.inner.poll_next(cx)
    }
}

impl Into<IndexingStream> for Vec<Result<Node>> {
    fn into(self) -> IndexingStream {
        IndexingStream::iter(self)
    }
}

impl Into<IndexingStream> for Result<Vec<Node>> {
    fn into(self) -> IndexingStream {
        match self {
            Ok(nodes) => IndexingStream::iter(nodes.into_iter().map(Ok)),
            Err(err) => IndexingStream::iter(vec![Err(err)]),
        }
    }
}

impl Into<IndexingStream> for Pin<Box<dyn Stream<Item = Result<Node>> + Send>> {
    fn into(self) -> IndexingStream {
        IndexingStream { inner: self }
    }
}

impl Into<IndexingStream> for Receiver<Result<Node>> {
    fn into(self) -> IndexingStream {
        IndexingStream {
            inner: tokio_stream::wrappers::ReceiverStream::new(self).boxed(),
        }
    }
}

impl IndexingStream {
    pub fn empty() -> Self {
        IndexingStream {
            inner: stream::empty().boxed(),
        }
    }

    pub fn iter<I>(iter: I) -> Self
    where
        I: IntoIterator<Item = Result<Node>> + Send + 'static,
        <I as IntoIterator>::IntoIter: Send,
    {
        IndexingStream {
            inner: stream::iter(iter).boxed(),
        }
    }
}