Skip to main content

fraiseql_wire/stream/
query_stream.rs

1//! Query stream with pause/resume/stats capabilities
2//!
3//! This stream combines JsonStream (with control methods) with optional filtering
4//! and type-safe deserialization. It exposes pause(), resume(), and stats() methods
5//! while implementing `Stream<Item = Result<T>>`.
6
7use crate::stream::JsonStream;
8use crate::{Error, Result};
9use futures::stream::Stream;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// Type alias for Rust-side predicate function
17type Predicate = Box<dyn Fn(&Value) -> bool + Send>;
18
19/// Query stream with pause/resume/stats capabilities
20///
21/// This stream combines JsonStream (with control methods) with optional filtering
22/// and type-safe deserialization. It exposes pause(), resume(), and stats() methods
23/// while implementing `Stream<Item = Result<T>>`.
24pub struct QueryStream<T: DeserializeOwned + Unpin> {
25    /// Inner JSON stream (provides pause/resume/stats)
26    inner: JsonStream,
27    /// Optional Rust-side predicate for filtering
28    predicate: Option<Predicate>,
29    /// Type marker for deserialization target
30    _phantom: PhantomData<T>,
31}
32
33impl<T: DeserializeOwned + Unpin> QueryStream<T> {
34    /// Create a new query stream
35    pub fn new(inner: JsonStream, predicate: Option<Predicate>) -> Self {
36        Self {
37            inner,
38            predicate,
39            _phantom: PhantomData,
40        }
41    }
42
43    /// Pause the stream
44    pub async fn pause(&mut self) -> Result<()> {
45        self.inner.pause().await
46    }
47
48    /// Resume the stream
49    pub async fn resume(&mut self) -> Result<()> {
50        self.inner.resume().await
51    }
52
53    /// Get stream statistics
54    pub fn stats(&self) -> crate::stream::StreamStats {
55        self.inner.stats()
56    }
57
58    /// Get current stream state snapshot
59    pub fn state_snapshot(&self) -> crate::stream::StreamState {
60        self.inner.state_snapshot()
61    }
62
63    /// Get buffered rows when paused
64    pub fn paused_occupancy(&self) -> usize {
65        self.inner.paused_occupancy()
66    }
67
68    /// Pause with diagnostic reason
69    pub async fn pause_with_reason(&mut self, reason: &str) -> Result<()> {
70        self.inner.pause_with_reason(reason).await
71    }
72
73    /// Deserialize a JSON value to type T
74    fn deserialize_value(value: Value) -> Result<T> {
75        match serde_json::from_value::<T>(value) {
76            Ok(result) => Ok(result),
77            Err(e) => Err(Error::Deserialization {
78                type_name: std::any::type_name::<T>().to_string(),
79                details: e.to_string(),
80            }),
81        }
82    }
83}
84
85impl<T: DeserializeOwned + Unpin> Stream for QueryStream<T> {
86    type Item = Result<T>;
87
88    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
89        loop {
90            // Poll the inner JsonStream
91            match Pin::new(&mut self.inner).poll_next(cx) {
92                Poll::Ready(Some(Ok(value))) => {
93                    // Apply predicate if present
94                    if let Some(ref predicate) = self.predicate {
95                        if !predicate(&value) {
96                            // Filtered out, try next value
97                            continue;
98                        }
99                    }
100
101                    // Deserialize to target type T
102                    return Poll::Ready(Some(Self::deserialize_value(value)));
103                }
104                Poll::Ready(Some(Err(e))) => {
105                    // Propagate errors
106                    return Poll::Ready(Some(Err(e)));
107                }
108                Poll::Ready(None) => {
109                    // End of stream
110                    return Poll::Ready(None);
111                }
112                Poll::Pending => {
113                    return Poll::Pending;
114                }
115            }
116        }
117    }
118}
119
120impl<T: DeserializeOwned + Unpin> Unpin for QueryStream<T> {}