fraiseql_wire/stream/
query_stream.rs1use crate::stream::JsonStream;
8use crate::{Error, Result};
9use futures::stream::Stream;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16type Predicate = Box<dyn Fn(&Value) -> bool + Send>;
18
19pub struct QueryStream<T: DeserializeOwned + Unpin> {
25 inner: JsonStream,
27 predicate: Option<Predicate>,
29 _phantom: PhantomData<T>,
31}
32
33impl<T: DeserializeOwned + Unpin> QueryStream<T> {
34 pub fn new(inner: JsonStream, predicate: Option<Predicate>) -> Self {
36 Self {
37 inner,
38 predicate,
39 _phantom: PhantomData,
40 }
41 }
42
43 pub async fn pause(&mut self) -> Result<()> {
45 self.inner.pause().await
46 }
47
48 pub async fn resume(&mut self) -> Result<()> {
50 self.inner.resume().await
51 }
52
53 pub fn stats(&self) -> crate::stream::StreamStats {
55 self.inner.stats()
56 }
57
58 pub fn state_snapshot(&self) -> crate::stream::StreamState {
60 self.inner.state_snapshot()
61 }
62
63 pub fn paused_occupancy(&self) -> usize {
65 self.inner.paused_occupancy()
66 }
67
68 pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
70 self.inner.pause_with_reason(reason).await
71 }
72
73 fn deserialize_value(value: Value) -> Result<T> {
75 match serde_json::from_value::<T>(value) {
76 Ok(result) => Ok(result),
77 Err(e) => Err(Error::Deserialization {
78 type_name: std::any::type_name::<T>().to_string(),
79 details: e.to_string(),
80 }),
81 }
82 }
83}
84
85impl<T: DeserializeOwned + Unpin> Stream for QueryStream<T> {
86 type Item = Result<T>;
87
88 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
89 loop {
90 match Pin::new(&mut self.inner).poll_next(cx) {
92 Poll::Ready(Some(Ok(value))) => {
93 if let Some(ref predicate) = self.predicate {
95 if !predicate(&value) {
96 continue;
98 }
99 }
100
101 return Poll::Ready(Some(Self::deserialize_value(value)));
103 }
104 Poll::Ready(Some(Err(e))) => {
105 return Poll::Ready(Some(Err(e)));
107 }
108 Poll::Ready(None) => {
109 return Poll::Ready(None);
111 }
112 Poll::Pending => {
113 return Poll::Pending;
114 }
115 }
116 }
117 }
118}
119
120impl<T: DeserializeOwned + Unpin> Unpin for QueryStream<T> {}