fraiseql_wire/stream/query_stream.rs
1//! Query stream with pause/resume/stats capabilities
2//!
3//! This stream combines `JsonStream` (with control methods) with optional filtering
4//! and type-safe deserialization. It exposes `pause()`, `resume()`, and `stats()` methods
5//! while implementing `Stream<Item = Result<T>>`.
6
7use crate::stream::JsonStream;
8use crate::{Error, Result};
9use futures::stream::Stream;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// Type alias for Rust-side predicate function
17type Predicate = Box<dyn Fn(&Value) -> bool + Send>;
18
19/// Query stream with pause/resume/stats capabilities
20///
21/// This stream combines `JsonStream` (with control methods) with optional filtering
22/// and type-safe deserialization. It exposes `pause()`, `resume()`, and `stats()` methods
23/// while implementing `Stream<Item = Result<T>>`.
24pub struct QueryStream<T: DeserializeOwned + Unpin> {
25 /// Inner JSON stream (provides pause/resume/stats)
26 inner: JsonStream,
27 /// Optional Rust-side predicate for filtering
28 predicate: Option<Predicate>,
29 /// Type marker for deserialization target
30 _phantom: PhantomData<T>,
31}
32
33impl<T: DeserializeOwned + Unpin> QueryStream<T> {
34 /// Create a new query stream
35 pub fn new(inner: JsonStream, predicate: Option<Predicate>) -> Self {
36 Self {
37 inner,
38 predicate,
39 _phantom: PhantomData,
40 }
41 }
42
43 /// Pause the stream
44 ///
45 /// # Errors
46 ///
47 /// Returns `Error::Protocol` if the stream has already completed or failed.
48 pub async fn pause(&mut self) -> Result<()> {
49 self.inner.pause().await
50 }
51
52 /// Resume the stream
53 ///
54 /// # Errors
55 ///
56 /// Returns `Error::Protocol` if the stream has already completed or failed.
57 pub async fn resume(&mut self) -> Result<()> {
58 self.inner.resume().await
59 }
60
61 /// Get stream statistics
62 pub fn stats(&self) -> crate::stream::StreamStats {
63 self.inner.stats()
64 }
65
66 /// Get current stream state snapshot
67 pub fn state_snapshot(&self) -> crate::stream::StreamState {
68 self.inner.state_snapshot()
69 }
70
71 /// Get buffered rows when paused
72 pub fn paused_occupancy(&self) -> usize {
73 self.inner.paused_occupancy()
74 }
75
76 /// Pause with diagnostic reason
77 ///
78 /// # Errors
79 ///
80 /// Returns `Error::Protocol` if the stream has already completed or failed.
81 pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
82 self.inner.pause_with_reason(reason).await
83 }
84
85 /// Deserialize a JSON value to type T
86 fn deserialize_value(value: Value) -> Result<T> {
87 match serde_json::from_value::<T>(value) {
88 Ok(result) => Ok(result),
89 Err(e) => Err(Error::Deserialization {
90 type_name: std::any::type_name::<T>().to_string(),
91 details: e.to_string(),
92 }),
93 }
94 }
95}
96
97impl<T: DeserializeOwned + Unpin> Stream for QueryStream<T> {
98 type Item = Result<T>;
99
100 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101 loop {
102 // Poll the inner JsonStream
103 match Pin::new(&mut self.inner).poll_next(cx) {
104 Poll::Ready(Some(Ok(value))) => {
105 // Apply predicate if present
106 if let Some(ref predicate) = self.predicate {
107 if !predicate(&value) {
108 // Filtered out, try next value
109 continue;
110 }
111 }
112
113 // Deserialize to target type T
114 return Poll::Ready(Some(Self::deserialize_value(value)));
115 }
116 Poll::Ready(Some(Err(e))) => {
117 // Propagate errors
118 return Poll::Ready(Some(Err(e)));
119 }
120 Poll::Ready(None) => {
121 // End of stream
122 return Poll::Ready(None);
123 }
124 Poll::Pending => {
125 return Poll::Pending;
126 }
127 }
128 }
129 }
130}
131
132impl<T: DeserializeOwned + Unpin> Unpin for QueryStream<T> {}