Skip to main content

fraiseql_wire/stream/
typed_stream.rs

1//! Typed JSON stream implementation
2//!
3//! TypedJsonStream wraps a raw JSON stream and deserializes each item to a target type T.
4//! Type T is **consumer-side only** - it does NOT affect SQL generation, filtering,
5//! ordering, or wire protocol. Deserialization happens lazily at poll_next().
6
7use crate::{Error, Result};
8use futures::stream::Stream;
9use futures::StreamExt;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// Typed JSON stream that deserializes rows to type T
17///
18/// This stream wraps a raw JSON stream and deserializes each Value to the target type T.
19///
20/// **Important**: Type T is **consumer-side only**.
21/// - T does NOT affect SQL generation (still `SELECT data FROM v_{entity}`)
22/// - T does NOT affect filtering (where_sql, where_rust, order_by)
23/// - T does NOT affect wire protocol (identical for all T)
24/// - T ONLY affects consumer-side deserialization at poll_next()
25///
26/// # Examples
27///
28/// ```ignore
29/// use serde::Deserialize;
30///
31/// #[derive(Deserialize)]
32/// struct Project {
33///     id: String,
34///     name: String,
35/// }
36///
37/// let mut stream = client.query::<Project>("projects").execute().await?;
38/// while let Some(result) = stream.next().await {
39///     let project: Project = result?;
40///     println!("Project: {}", project.name);
41/// }
42///
43/// // Escape hatch: Always use Value if needed
44/// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
45/// while let Some(result) = stream.next().await {
46///     let json: Value = result?;
47///     println!("Raw JSON: {:?}", json);
48/// }
49/// ```
50pub struct TypedJsonStream<T: DeserializeOwned> {
51    /// Inner stream of JSON values
52    inner: Box<dyn Stream<Item = Result<Value>> + Unpin>,
53    /// Phantom data for type T (zero runtime cost)
54    _phantom: PhantomData<T>,
55}
56
57impl<T: DeserializeOwned> TypedJsonStream<T> {
58    /// Create a new typed stream from a raw JSON stream
59    pub fn new(inner: Box<dyn Stream<Item = Result<Value>> + Unpin>) -> Self {
60        Self {
61            inner,
62            _phantom: PhantomData,
63        }
64    }
65
66    /// Deserialize a JSON value to type T
67    ///
68    /// This is the only place type T matters. Deserialization is lazy (per-item)
69    /// to skip deserializing filtered-out rows.
70    fn deserialize_value(value: Value) -> Result<T> {
71        let type_name = std::any::type_name::<T>().to_string();
72        let deser_start = std::time::Instant::now();
73
74        match serde_json::from_value::<T>(value) {
75            Ok(result) => {
76                let duration_ms = deser_start.elapsed().as_millis() as u64;
77                crate::metrics::histograms::deserialization_duration(
78                    "unknown",
79                    &type_name,
80                    duration_ms,
81                );
82                crate::metrics::counters::deserialization_success("unknown", &type_name);
83                Ok(result)
84            }
85            Err(e) => {
86                crate::metrics::counters::deserialization_failure(
87                    "unknown",
88                    &type_name,
89                    "serde_error",
90                );
91                Err(Error::Deserialization {
92                    type_name,
93                    details: e.to_string(),
94                })
95            }
96        }
97    }
98}
99
100impl<T: DeserializeOwned + Unpin> Stream for TypedJsonStream<T> {
101    type Item = Result<T>;
102
103    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104        match self.inner.poll_next_unpin(cx) {
105            Poll::Ready(Some(Ok(value))) => {
106                // Deserialize happens HERE, at poll_next
107                // This is the only place type T affects behavior
108                Poll::Ready(Some(Self::deserialize_value(value)))
109            }
110            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
111            Poll::Ready(None) => Poll::Ready(None),
112            Poll::Pending => Poll::Pending,
113        }
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    #[test]
122    fn test_typed_stream_creation() {
123        // Verify TypedJsonStream can be created with different types
124        let _stream: TypedJsonStream<serde_json::Value> =
125            TypedJsonStream::new(Box::new(futures::stream::empty()));
126
127        #[derive(serde::Deserialize, Debug)]
128        // Reason: test fixture struct used only for deserialization verification
129        #[allow(dead_code)]
130        struct TestType {
131            id: String,
132        }
133
134        let _stream: TypedJsonStream<TestType> =
135            TypedJsonStream::new(Box::new(futures::stream::empty()));
136    }
137
138    #[test]
139    fn test_deserialize_valid_value() {
140        let json = serde_json::json!({
141            "id": "123",
142            "name": "Test"
143        });
144
145        #[derive(serde::Deserialize)]
146        // Reason: test fixture struct used only for deserialization verification
147        #[allow(dead_code)]
148        struct TestType {
149            id: String,
150            name: String,
151        }
152
153        let result = TypedJsonStream::<TestType>::deserialize_value(json);
154        assert!(result.is_ok());
155        let item = result.unwrap();
156        assert_eq!(item.id, "123");
157        assert_eq!(item.name, "Test");
158    }
159
160    #[test]
161    fn test_deserialize_missing_field() {
162        let json = serde_json::json!({
163            "id": "123"
164            // missing "name" field
165        });
166
167        #[derive(Debug, serde::Deserialize)]
168        // Reason: test fixture struct used only for deserialization verification
169        #[allow(dead_code)]
170        struct TestType {
171            id: String,
172            name: String,
173        }
174
175        let result = TypedJsonStream::<TestType>::deserialize_value(json);
176        assert!(result.is_err());
177
178        let err = result.unwrap_err();
179        match err {
180            Error::Deserialization { type_name, details } => {
181                assert!(type_name.contains("TestType"));
182                assert!(details.contains("name"));
183            }
184            _ => panic!("Expected Deserialization error"),
185        }
186    }
187
188    #[test]
189    fn test_deserialize_type_mismatch() {
190        let json = serde_json::json!({
191            "id": "123",
192            "count": "not a number"  // should be i32
193        });
194
195        #[derive(Debug, serde::Deserialize)]
196        // Reason: test fixture struct used only for deserialization verification
197        #[allow(dead_code)]
198        struct TestType {
199            id: String,
200            count: i32,
201        }
202
203        let result = TypedJsonStream::<TestType>::deserialize_value(json);
204        assert!(result.is_err());
205
206        let err = result.unwrap_err();
207        match err {
208            Error::Deserialization { type_name, details } => {
209                assert!(type_name.contains("TestType"));
210                assert!(details.contains("invalid") || details.contains("type"));
211            }
212            _ => panic!("Expected Deserialization error"),
213        }
214    }
215
216    #[test]
217    fn test_deserialize_value_type() {
218        let json = serde_json::json!({
219            "id": "123",
220            "name": "Test"
221        });
222
223        // Test that Value (escape hatch) works
224        let result = TypedJsonStream::<serde_json::Value>::deserialize_value(json.clone());
225        assert!(result.is_ok());
226        assert_eq!(result.unwrap(), json);
227    }
228
229    #[test]
230    fn test_phantom_data_has_no_size() {
231        use std::mem::size_of;
232
233        // Verify PhantomData adds zero size
234        let size_without_phantom = size_of::<Box<dyn Stream<Item = Result<Value>> + Unpin>>();
235        let size_with_phantom = size_of::<TypedJsonStream<serde_json::Value>>();
236
237        // PhantomData should not increase size
238        // (might be equal or slightly different due to alignment, but not significantly larger)
239        assert!(
240            size_with_phantom <= size_without_phantom + 8,
241            "PhantomData added too much size: {} vs {}",
242            size_with_phantom,
243            size_without_phantom
244        );
245    }
246}