Skip to main content

fraiseql_wire/stream/
typed_stream.rs

1//! Typed JSON stream implementation
2//!
3//! TypedJsonStream wraps a raw JSON stream and deserializes each item to a target type T.
4//! Type T is **consumer-side only** - it does NOT affect SQL generation, filtering,
5//! ordering, or wire protocol. Deserialization happens lazily at poll_next().
6
7use crate::{Error, Result};
8use futures::stream::Stream;
9use futures::StreamExt;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// Typed JSON stream that deserializes rows to type T
17///
18/// This stream wraps a raw JSON stream and deserializes each Value to the target type T.
19///
20/// **Important**: Type T is **consumer-side only**.
21/// - T does NOT affect SQL generation (still `SELECT data FROM v_{entity}`)
22/// - T does NOT affect filtering (where_sql, where_rust, order_by)
23/// - T does NOT affect wire protocol (identical for all T)
24/// - T ONLY affects consumer-side deserialization at poll_next()
25///
26/// # Examples
27///
28/// ```ignore
29/// use serde::Deserialize;
30///
31/// #[derive(Deserialize)]
32/// struct Project {
33///     id: String,
34///     name: String,
35/// }
36///
37/// let mut stream = client.query::<Project>("projects").execute().await?;
38/// while let Some(result) = stream.next().await {
39///     let project: Project = result?;
40///     println!("Project: {}", project.name);
41/// }
42///
43/// // Escape hatch: Always use Value if needed
44/// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
45/// while let Some(result) = stream.next().await {
46///     let json: Value = result?;
47///     println!("Raw JSON: {:?}", json);
48/// }
49/// ```
50pub struct TypedJsonStream<T: DeserializeOwned> {
51    /// Inner stream of JSON values
52    inner: Box<dyn Stream<Item = Result<Value>> + Unpin>,
53    /// Phantom data for type T (zero runtime cost)
54    _phantom: PhantomData<T>,
55}
56
57impl<T: DeserializeOwned> TypedJsonStream<T> {
58    /// Create a new typed stream from a raw JSON stream
59    pub fn new(inner: Box<dyn Stream<Item = Result<Value>> + Unpin>) -> Self {
60        Self {
61            inner,
62            _phantom: PhantomData,
63        }
64    }
65
66    /// Deserialize a JSON value to type T
67    ///
68    /// This is the only place type T matters. Deserialization is lazy (per-item)
69    /// to skip deserializing filtered-out rows.
70    fn deserialize_value(value: Value) -> Result<T> {
71        let type_name = std::any::type_name::<T>().to_string();
72        let deser_start = std::time::Instant::now();
73
74        match serde_json::from_value::<T>(value) {
75            Ok(result) => {
76                let duration_ms = deser_start.elapsed().as_millis() as u64;
77                crate::metrics::histograms::deserialization_duration(
78                    "unknown",
79                    &type_name,
80                    duration_ms,
81                );
82                crate::metrics::counters::deserialization_success("unknown", &type_name);
83                Ok(result)
84            }
85            Err(e) => {
86                crate::metrics::counters::deserialization_failure(
87                    "unknown",
88                    &type_name,
89                    "serde_error",
90                );
91                Err(Error::Deserialization {
92                    type_name,
93                    details: e.to_string(),
94                })
95            }
96        }
97    }
98}
99
100impl<T: DeserializeOwned + Unpin> Stream for TypedJsonStream<T> {
101    type Item = Result<T>;
102
103    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104        match self.inner.poll_next_unpin(cx) {
105            Poll::Ready(Some(Ok(value))) => {
106                // Deserialize happens HERE, at poll_next
107                // This is the only place type T affects behavior
108                Poll::Ready(Some(Self::deserialize_value(value)))
109            }
110            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
111            Poll::Ready(None) => Poll::Ready(None),
112            Poll::Pending => Poll::Pending,
113        }
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    #[test]
122    fn test_typed_stream_creation() {
123        // Verify TypedJsonStream can be created with different types
124        let _stream: TypedJsonStream<serde_json::Value> =
125            TypedJsonStream::new(Box::new(futures::stream::empty()));
126
127        #[derive(serde::Deserialize, Debug)]
128        #[allow(dead_code)]
129        struct TestType {
130            id: String,
131        }
132
133        let _stream: TypedJsonStream<TestType> =
134            TypedJsonStream::new(Box::new(futures::stream::empty()));
135    }
136
137    #[test]
138    fn test_deserialize_valid_value() {
139        let json = serde_json::json!({
140            "id": "123",
141            "name": "Test"
142        });
143
144        #[derive(serde::Deserialize)]
145        #[allow(dead_code)]
146        struct TestType {
147            id: String,
148            name: String,
149        }
150
151        let result = TypedJsonStream::<TestType>::deserialize_value(json);
152        assert!(result.is_ok());
153        let item = result.unwrap();
154        assert_eq!(item.id, "123");
155        assert_eq!(item.name, "Test");
156    }
157
158    #[test]
159    fn test_deserialize_missing_field() {
160        let json = serde_json::json!({
161            "id": "123"
162            // missing "name" field
163        });
164
165        #[derive(Debug, serde::Deserialize)]
166        #[allow(dead_code)]
167        struct TestType {
168            id: String,
169            name: String,
170        }
171
172        let result = TypedJsonStream::<TestType>::deserialize_value(json);
173        assert!(result.is_err());
174
175        let err = result.unwrap_err();
176        match err {
177            Error::Deserialization { type_name, details } => {
178                assert!(type_name.contains("TestType"));
179                assert!(details.contains("name"));
180            }
181            _ => panic!("Expected Deserialization error"),
182        }
183    }
184
185    #[test]
186    fn test_deserialize_type_mismatch() {
187        let json = serde_json::json!({
188            "id": "123",
189            "count": "not a number"  // should be i32
190        });
191
192        #[derive(Debug, serde::Deserialize)]
193        #[allow(dead_code)]
194        struct TestType {
195            id: String,
196            count: i32,
197        }
198
199        let result = TypedJsonStream::<TestType>::deserialize_value(json);
200        assert!(result.is_err());
201
202        let err = result.unwrap_err();
203        match err {
204            Error::Deserialization { type_name, details } => {
205                assert!(type_name.contains("TestType"));
206                assert!(details.contains("invalid") || details.contains("type"));
207            }
208            _ => panic!("Expected Deserialization error"),
209        }
210    }
211
212    #[test]
213    fn test_deserialize_value_type() {
214        let json = serde_json::json!({
215            "id": "123",
216            "name": "Test"
217        });
218
219        // Test that Value (escape hatch) works
220        let result = TypedJsonStream::<serde_json::Value>::deserialize_value(json.clone());
221        assert!(result.is_ok());
222        assert_eq!(result.unwrap(), json);
223    }
224
225    #[test]
226    fn test_phantom_data_has_no_size() {
227        use std::mem::size_of;
228
229        // Verify PhantomData adds zero size
230        let size_without_phantom = size_of::<Box<dyn Stream<Item = Result<Value>> + Unpin>>();
231        let size_with_phantom = size_of::<TypedJsonStream<serde_json::Value>>();
232
233        // PhantomData should not increase size
234        // (might be equal or slightly different due to alignment, but not significantly larger)
235        assert!(
236            size_with_phantom <= size_without_phantom + 8,
237            "PhantomData added too much size: {} vs {}",
238            size_with_phantom,
239            size_without_phantom
240        );
241    }
242}