swiftide_core/
query_stream.rs

1//! Internally used by a query pipeline
2//!
3//! Has a sender and receiver to initialize the stream
4use anyhow::Result;
5use std::pin::Pin;
6use tokio::sync::mpsc::Sender;
7use tokio_stream::wrappers::ReceiverStream;
8
9use futures_util::stream::Stream;
10pub use futures_util::{StreamExt, TryStreamExt};
11
12use crate::{query::QueryState, querying::Query};
13
14/// Internally used by a query pipeline
15///
16/// Has a sender and receiver to initialize the stream
17#[pin_project::pin_project]
18pub struct QueryStream<'stream, STATE: 'stream + QueryState> {
19    #[pin]
20    pub(crate) inner: Pin<Box<dyn Stream<Item = Result<Query<STATE>>> + Send + 'stream>>,
21
22    #[pin]
23    pub sender: Option<Sender<Result<Query<STATE>>>>,
24}
25
26impl<'stream, STATE: QueryState + 'stream> Default for QueryStream<'stream, STATE> {
27    fn default() -> Self {
28        let (sender, receiver) = tokio::sync::mpsc::channel(1000);
29
30        Self {
31            inner: ReceiverStream::new(receiver).boxed(),
32            sender: Some(sender),
33        }
34    }
35}
36
37impl<STATE: QueryState> Stream for QueryStream<'_, STATE> {
38    type Item = Result<Query<STATE>>;
39
40    fn poll_next(
41        self: Pin<&mut Self>,
42        cx: &mut std::task::Context<'_>,
43    ) -> std::task::Poll<Option<Self::Item>> {
44        let this = self.project();
45        this.inner.poll_next(cx)
46    }
47}
48
49impl<STATE: QueryState> From<Pin<Box<dyn Stream<Item = Result<Query<STATE>>> + Send>>>
50    for QueryStream<'_, STATE>
51{
52    fn from(val: Pin<Box<dyn Stream<Item = Result<Query<STATE>>> + Send>>) -> Self {
53        QueryStream {
54            inner: val,
55            sender: None,
56        }
57    }
58}