Skip to main content

fraiseql_wire/stream/
query_stream.rs

1//! Query stream with pause/resume/stats capabilities
2//!
3//! This stream combines `JsonStream` (with control methods) with optional filtering
4//! and type-safe deserialization. It exposes `pause()`, `resume()`, and `stats()` methods
5//! while implementing `Stream<Item = Result<T>>`.
6
7use crate::stream::JsonStream;
8use crate::{Error, Result};
9use futures::stream::Stream;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// Type alias for Rust-side predicate function
17type Predicate = Box<dyn Fn(&Value) -> bool + Send>;
18
19/// Query stream with pause/resume/stats capabilities
20///
21/// This stream combines `JsonStream` (with control methods) with optional filtering
22/// and type-safe deserialization. It exposes `pause()`, `resume()`, and `stats()` methods
23/// while implementing `Stream<Item = Result<T>>`.
24pub struct QueryStream<T: DeserializeOwned + Unpin> {
25    /// Inner JSON stream (provides pause/resume/stats)
26    inner: JsonStream,
27    /// Optional Rust-side predicate for filtering
28    predicate: Option<Predicate>,
29    /// Type marker for deserialization target
30    _phantom: PhantomData<T>,
31}
32
33impl<T: DeserializeOwned + Unpin> QueryStream<T> {
34    /// Create a new query stream
35    pub fn new(inner: JsonStream, predicate: Option<Predicate>) -> Self {
36        Self {
37            inner,
38            predicate,
39            _phantom: PhantomData,
40        }
41    }
42
43    /// Pause the stream
44    ///
45    /// # Errors
46    ///
47    /// Returns `Error::Protocol` if the stream has already completed or failed.
48    pub async fn pause(&mut self) -> Result<()> {
49        self.inner.pause().await
50    }
51
52    /// Resume the stream
53    ///
54    /// # Errors
55    ///
56    /// Returns `Error::Protocol` if the stream has already completed or failed.
57    pub async fn resume(&mut self) -> Result<()> {
58        self.inner.resume().await
59    }
60
61    /// Get stream statistics
62    pub fn stats(&self) -> crate::stream::StreamStats {
63        self.inner.stats()
64    }
65
66    /// Get current stream state snapshot
67    pub fn state_snapshot(&self) -> crate::stream::StreamState {
68        self.inner.state_snapshot()
69    }
70
71    /// Get buffered rows when paused
72    pub fn paused_occupancy(&self) -> usize {
73        self.inner.paused_occupancy()
74    }
75
76    /// Pause with diagnostic reason
77    ///
78    /// # Errors
79    ///
80    /// Returns `Error::Protocol` if the stream has already completed or failed.
81    pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
82        self.inner.pause_with_reason(reason).await
83    }
84
85    /// Deserialize a JSON value to type T
86    fn deserialize_value(value: Value) -> Result<T> {
87        match serde_json::from_value::<T>(value) {
88            Ok(result) => Ok(result),
89            Err(e) => Err(Error::Deserialization {
90                type_name: std::any::type_name::<T>().to_string(),
91                details: e.to_string(),
92            }),
93        }
94    }
95}
96
97impl<T: DeserializeOwned + Unpin> Stream for QueryStream<T> {
98    type Item = Result<T>;
99
100    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101        loop {
102            // Poll the inner JsonStream
103            match Pin::new(&mut self.inner).poll_next(cx) {
104                Poll::Ready(Some(Ok(value))) => {
105                    // Apply predicate if present
106                    if let Some(ref predicate) = self.predicate {
107                        if !predicate(&value) {
108                            // Filtered out, try next value
109                            continue;
110                        }
111                    }
112
113                    // Deserialize to target type T
114                    return Poll::Ready(Some(Self::deserialize_value(value)));
115                }
116                Poll::Ready(Some(Err(e))) => {
117                    // Propagate errors
118                    return Poll::Ready(Some(Err(e)));
119                }
120                Poll::Ready(None) => {
121                    // End of stream
122                    return Poll::Ready(None);
123                }
124                Poll::Pending => {
125                    return Poll::Pending;
126                }
127            }
128        }
129    }
130}
131
132impl<T: DeserializeOwned + Unpin> Unpin for QueryStream<T> {}