swiftide_core/
query_stream.rs1use anyhow::Result;
5use std::pin::Pin;
6use tokio::sync::mpsc::Sender;
7use tokio_stream::wrappers::ReceiverStream;
8
9use futures_util::stream::Stream;
10pub use futures_util::{StreamExt, TryStreamExt};
11
12use crate::{query::QueryState, querying::Query};
13
14#[pin_project::pin_project]
18pub struct QueryStream<'stream, STATE: 'stream + QueryState> {
19 #[pin]
20 pub(crate) inner: Pin<Box<dyn Stream<Item = Result<Query<STATE>>> + Send + 'stream>>,
21
22 #[pin]
23 pub sender: Option<Sender<Result<Query<STATE>>>>,
24}
25
26impl<'stream, STATE: QueryState + 'stream> Default for QueryStream<'stream, STATE> {
27 fn default() -> Self {
28 let (sender, receiver) = tokio::sync::mpsc::channel(1000);
29
30 Self {
31 inner: ReceiverStream::new(receiver).boxed(),
32 sender: Some(sender),
33 }
34 }
35}
36
37impl<STATE: QueryState> Stream for QueryStream<'_, STATE> {
38 type Item = Result<Query<STATE>>;
39
40 fn poll_next(
41 self: Pin<&mut Self>,
42 cx: &mut std::task::Context<'_>,
43 ) -> std::task::Poll<Option<Self::Item>> {
44 let this = self.project();
45 this.inner.poll_next(cx)
46 }
47}
48
49impl<STATE: QueryState> From<Pin<Box<dyn Stream<Item = Result<Query<STATE>>> + Send>>>
50 for QueryStream<'_, STATE>
51{
52 fn from(val: Pin<Box<dyn Stream<Item = Result<Query<STATE>>> + Send>>) -> Self {
53 QueryStream {
54 inner: val,
55 sender: None,
56 }
57 }
58}