Skip to main content

fraiseql_wire/stream/
typed_stream.rs

1//! Typed JSON stream implementation
2//!
3//! `TypedJsonStream` wraps a raw JSON stream and deserializes each item to a target type T.
4//! Type T is **consumer-side only** - it does NOT affect SQL generation, filtering,
5//! ordering, or wire protocol. Deserialization happens lazily at `poll_next()`.
6
7use crate::{Error, Result};
8use futures::stream::Stream;
9use futures::StreamExt;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// Typed JSON stream that deserializes rows to type T
17///
18/// This stream wraps a raw JSON stream and deserializes each Value to the target type T.
19///
20/// **Important**: Type T is **consumer-side only**.
21/// - T does NOT affect SQL generation (still `SELECT data FROM v_{entity}`)
22/// - T does NOT affect filtering (`where_sql`, `where_rust`, `order_by`)
23/// - T does NOT affect wire protocol (identical for all T)
24/// - T ONLY affects consumer-side deserialization at `poll_next()`
25///
26/// # Examples
27///
28/// ```no_run
29/// use serde::Deserialize;
30/// use futures::stream::StreamExt;
31///
32/// #[derive(Deserialize)]
33/// struct Project {
34///     id: String,
35///     name: String,
36/// }
37///
38/// # async fn typed_example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
39/// let mut stream = client.query::<Project>("projects").execute().await?;
40/// while let Some(result) = stream.next().await {
41///     let project: Project = result?;
42///     println!("Project: {}", project.name);
43/// }
44/// # Ok(())
45/// # }
46///
47/// // Escape hatch: Always use Value if needed
48/// # async fn value_example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
49/// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
50/// while let Some(result) = stream.next().await {
51///     let json: serde_json::Value = result?;
52///     println!("Raw JSON: {:?}", json);
53/// }
54/// # Ok(())
55/// # }
56/// ```
57pub struct TypedJsonStream<T: DeserializeOwned> {
58    /// Inner stream of JSON values.
59    ///
60    /// The `Send` bound ensures that `TypedJsonStream` itself is `Send`,
61    /// allowing it to be held across `.await` points in async tasks and
62    /// transferred between threads (e.g. via `tokio::spawn`).
63    inner: Box<dyn Stream<Item = Result<Value>> + Send + Unpin>,
64    /// Phantom data for type T (zero runtime cost)
65    _phantom: PhantomData<T>,
66}
67
68impl<T: DeserializeOwned> TypedJsonStream<T> {
69    /// Create a new typed stream from a raw JSON stream
70    pub fn new(inner: Box<dyn Stream<Item = Result<Value>> + Send + Unpin>) -> Self {
71        Self {
72            inner,
73            _phantom: PhantomData,
74        }
75    }
76
77    /// Deserialize a JSON value to type T
78    ///
79    /// This is the only place type T matters. Deserialization is lazy (per-item)
80    /// to skip deserializing filtered-out rows.
81    fn deserialize_value(value: Value) -> Result<T> {
82        let type_name = std::any::type_name::<T>().to_string();
83        let deser_start = std::time::Instant::now();
84
85        match serde_json::from_value::<T>(value) {
86            Ok(result) => {
87                let duration_ms = deser_start.elapsed().as_millis() as u64;
88                crate::metrics::histograms::deserialization_duration(
89                    "unknown",
90                    &type_name,
91                    duration_ms,
92                );
93                crate::metrics::counters::deserialization_success("unknown", &type_name);
94                Ok(result)
95            }
96            Err(e) => {
97                crate::metrics::counters::deserialization_failure(
98                    "unknown",
99                    &type_name,
100                    "serde_error",
101                );
102                Err(Error::Deserialization {
103                    type_name,
104                    details: e.to_string(),
105                })
106            }
107        }
108    }
109}
110
111impl<T: DeserializeOwned + Unpin> Stream for TypedJsonStream<T> {
112    type Item = Result<T>;
113
114    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115        match self.inner.poll_next_unpin(cx) {
116            Poll::Ready(Some(Ok(value))) => {
117                // Deserialize happens HERE, at poll_next
118                // This is the only place type T affects behavior
119                Poll::Ready(Some(Self::deserialize_value(value)))
120            }
121            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
122            Poll::Ready(None) => Poll::Ready(None),
123            Poll::Pending => Poll::Pending,
124        }
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    #![allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
131    use super::*;
132
133    #[test]
134    fn test_typed_stream_creation() {
135        // Verify TypedJsonStream can be created with different types
136        let _stream: TypedJsonStream<serde_json::Value> =
137            TypedJsonStream::new(Box::new(futures::stream::empty()));
138
139        #[derive(serde::Deserialize, Debug)]
140        #[allow(dead_code)] // Reason: test fixture struct; fields read only by serde deserialization verification
141        struct TestType {
142            id: String,
143        }
144
145        let _stream: TypedJsonStream<TestType> =
146            TypedJsonStream::new(Box::new(futures::stream::empty()));
147    }
148
149    #[test]
150    fn test_deserialize_valid_value() {
151        let json = serde_json::json!({
152            "id": "123",
153            "name": "Test"
154        });
155
156        #[derive(serde::Deserialize)]
157        #[allow(dead_code)] // Reason: test fixture struct; fields read only by serde deserialization verification
158        struct TestType {
159            id: String,
160            name: String,
161        }
162
163        let result = TypedJsonStream::<TestType>::deserialize_value(json);
164        assert!(result.is_ok());
165        let item = result.unwrap();
166        assert_eq!(item.id, "123");
167        assert_eq!(item.name, "Test");
168    }
169
170    #[test]
171    fn test_deserialize_missing_field() {
172        let json = serde_json::json!({
173            "id": "123"
174            // missing "name" field
175        });
176
177        #[derive(Debug, serde::Deserialize)]
178        #[allow(dead_code)] // Reason: test fixture struct; fields read only by serde deserialization verification
179        struct TestType {
180            id: String,
181            name: String,
182        }
183
184        let result = TypedJsonStream::<TestType>::deserialize_value(json);
185        assert!(result.is_err());
186
187        let err = result.unwrap_err();
188        match err {
189            Error::Deserialization { type_name, details } => {
190                assert!(type_name.contains("TestType"));
191                assert!(details.contains("name"));
192            }
193            _ => panic!("Expected Deserialization error"),
194        }
195    }
196
197    #[test]
198    fn test_deserialize_type_mismatch() {
199        let json = serde_json::json!({
200            "id": "123",
201            "count": "not a number"  // should be i32
202        });
203
204        #[derive(Debug, serde::Deserialize)]
205        #[allow(dead_code)] // Reason: test fixture struct; fields read only by serde deserialization verification
206        struct TestType {
207            id: String,
208            count: i32,
209        }
210
211        let result = TypedJsonStream::<TestType>::deserialize_value(json);
212        assert!(result.is_err());
213
214        let err = result.unwrap_err();
215        match err {
216            Error::Deserialization { type_name, details } => {
217                assert!(type_name.contains("TestType"));
218                assert!(details.contains("invalid") || details.contains("type"));
219            }
220            _ => panic!("Expected Deserialization error"),
221        }
222    }
223
224    #[test]
225    fn test_deserialize_value_type() {
226        let json = serde_json::json!({
227            "id": "123",
228            "name": "Test"
229        });
230
231        // Test that Value (escape hatch) works
232        let result = TypedJsonStream::<serde_json::Value>::deserialize_value(json.clone());
233        assert!(result.is_ok());
234        assert_eq!(result.unwrap(), json);
235    }
236
237    #[test]
238    fn test_phantom_data_has_no_size() {
239        use std::mem::size_of;
240
241        // Verify PhantomData adds zero size
242        let size_without_phantom = size_of::<Box<dyn Stream<Item = Result<Value>> + Unpin>>();
243        let size_with_phantom = size_of::<TypedJsonStream<serde_json::Value>>();
244
245        // PhantomData should not increase size
246        // (might be equal or slightly different due to alignment, but not significantly larger)
247        assert!(
248            size_with_phantom <= size_without_phantom + 8,
249            "PhantomData added too much size: {} vs {}",
250            size_with_phantom,
251            size_without_phantom
252        );
253    }
254}