swiftide_core/
query_stream.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//! Internally used by a query pipeline
//!
//! Has a sender and receiver to initialize the stream
use anyhow::Result;
use std::pin::Pin;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;

use futures_util::stream::Stream;
pub use futures_util::{StreamExt, TryStreamExt};

use crate::{query::QueryState, querying::Query};

/// Internally used by a query pipeline
///
/// Has a sender and receiver to initialize the stream
#[pin_project::pin_project]
pub struct QueryStream<'stream, STATE: 'stream + QueryState> {
    #[pin]
    pub(crate) inner: Pin<Box<dyn Stream<Item = Result<Query<STATE>>> + Send + 'stream>>,

    #[pin]
    pub sender: Option<Sender<Result<Query<STATE>>>>,
}

impl<'stream, STATE: QueryState + 'stream> Default for QueryStream<'stream, STATE> {
    fn default() -> Self {
        let (sender, receiver) = tokio::sync::mpsc::channel(1000);

        Self {
            inner: ReceiverStream::new(receiver).boxed(),
            sender: Some(sender),
        }
    }
}

impl<STATE: QueryState> Stream for QueryStream<'_, STATE> {
    type Item = Result<Query<STATE>>;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let this = self.project();
        this.inner.poll_next(cx)
    }
}

impl<STATE: QueryState> From<Pin<Box<dyn Stream<Item = Result<Query<STATE>>> + Send>>>
    for QueryStream<'_, STATE>
{
    fn from(val: Pin<Box<dyn Stream<Item = Result<Query<STATE>>> + Send>>) -> Self {
        QueryStream {
            inner: val,
            sender: None,
        }
    }
}