Skip to main content

fraiseql_wire/stream/
typed_stream.rs

1//! Typed JSON stream implementation
2//!
3//! `TypedJsonStream` wraps a raw JSON stream and deserializes each item to a target type T.
4//! Type T is **consumer-side only** - it does NOT affect SQL generation, filtering,
5//! ordering, or wire protocol. Deserialization happens lazily at `poll_next()`.
6
7use crate::{Result, WireError};
8use futures::stream::Stream;
9use futures::StreamExt;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// Typed JSON stream that deserializes rows to type T
17///
18/// This stream wraps a raw JSON stream and deserializes each Value to the target type T.
19///
20/// **Important**: Type T is **consumer-side only**.
21/// - T does NOT affect SQL generation (still `SELECT data FROM v_{entity}`)
22/// - T does NOT affect filtering (`where_sql`, `where_rust`, `order_by`)
23/// - T does NOT affect wire protocol (identical for all T)
24/// - T ONLY affects consumer-side deserialization at `poll_next()`
25///
26/// # Examples
27///
28/// ```text
29/// // Requires: live Postgres connection via FraiseClient.
30/// // Note: FraiseClient::query() takes ownership of self; create separate clients for
31/// // separate queries in production code.
32/// use serde::Deserialize;
33/// use futures::stream::StreamExt;
34///
35/// #[derive(Deserialize)]
36/// struct Project { id: String, name: String }
37///
38/// let mut stream = client.query::<Project>("projects").execute().await?;
39/// while let Some(result) = stream.next().await {
40///     let project: Project = result?;
41///     println!("Project: {}", project.name);
42/// }
43/// ```
44pub struct TypedJsonStream<T: DeserializeOwned> {
45    /// Inner stream of JSON values.
46    ///
47    /// The `Send` bound ensures that `TypedJsonStream` itself is `Send`,
48    /// allowing it to be held across `.await` points in async tasks and
49    /// transferred between threads (e.g. via `tokio::spawn`).
50    inner: Box<dyn Stream<Item = Result<Value>> + Send + Unpin>,
51    /// Phantom data for type T (zero runtime cost)
52    _phantom: PhantomData<T>,
53}
54
55impl<T: DeserializeOwned> TypedJsonStream<T> {
56    /// Create a new typed stream from a raw JSON stream
57    pub fn new(inner: Box<dyn Stream<Item = Result<Value>> + Send + Unpin>) -> Self {
58        Self {
59            inner,
60            _phantom: PhantomData,
61        }
62    }
63
64    /// Deserialize a JSON value to type T
65    ///
66    /// This is the only place type T matters. Deserialization is lazy (per-item)
67    /// to skip deserializing filtered-out rows.
68    fn deserialize_value(value: Value) -> Result<T> {
69        let type_name = std::any::type_name::<T>().to_string();
70        let deser_start = std::time::Instant::now();
71
72        match serde_json::from_value::<T>(value) {
73            Ok(result) => {
74                let duration_ms = deser_start.elapsed().as_millis() as u64;
75                crate::metrics::histograms::deserialization_duration(
76                    "unknown",
77                    &type_name,
78                    duration_ms,
79                );
80                crate::metrics::counters::deserialization_success("unknown", &type_name);
81                Ok(result)
82            }
83            Err(e) => {
84                crate::metrics::counters::deserialization_failure(
85                    "unknown",
86                    &type_name,
87                    "serde_error",
88                );
89                Err(WireError::Deserialization {
90                    type_name,
91                    details: e.to_string(),
92                })
93            }
94        }
95    }
96}
97
98impl<T: DeserializeOwned + Unpin> Stream for TypedJsonStream<T> {
99    type Item = Result<T>;
100
101    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102        match self.inner.poll_next_unpin(cx) {
103            Poll::Ready(Some(Ok(value))) => {
104                // Deserialize happens HERE, at poll_next
105                // This is the only place type T affects behavior
106                Poll::Ready(Some(Self::deserialize_value(value)))
107            }
108            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
109            Poll::Ready(None) => Poll::Ready(None),
110            Poll::Pending => Poll::Pending,
111        }
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    #![allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
118    use super::*;
119
120    #[test]
121    fn test_typed_stream_creation() {
122        // Verify TypedJsonStream can be created with different types
123        let _stream: TypedJsonStream<serde_json::Value> =
124            TypedJsonStream::new(Box::new(futures::stream::empty()));
125
126        #[derive(serde::Deserialize, Debug)]
127        // Reason: test fixture struct used only for deserialization verification
128        #[allow(dead_code)] // Reason: field kept for API completeness; may be used in future features
129        struct TestType {
130            id: String,
131        }
132
133        let _stream: TypedJsonStream<TestType> =
134            TypedJsonStream::new(Box::new(futures::stream::empty()));
135    }
136
137    #[test]
138    fn test_deserialize_valid_value() {
139        let json = serde_json::json!({
140            "id": "123",
141            "name": "Test"
142        });
143
144        #[derive(serde::Deserialize)]
145        // Reason: test fixture struct used only for deserialization verification
146        #[allow(dead_code)] // Reason: field kept for API completeness; may be used in future features
147        struct TestType {
148            id: String,
149            name: String,
150        }
151
152        let result = TypedJsonStream::<TestType>::deserialize_value(json);
153        let item = result.unwrap_or_else(|e| panic!("expected Ok for valid JSON, got: {e}"));
154        assert_eq!(item.id, "123");
155        assert_eq!(item.name, "Test");
156    }
157
158    #[test]
159    fn test_deserialize_missing_field() {
160        let json = serde_json::json!({
161            "id": "123"
162            // missing "name" field
163        });
164
165        #[derive(Debug, serde::Deserialize)]
166        // Reason: test fixture struct used only for deserialization verification
167        #[allow(dead_code)] // Reason: field kept for API completeness; may be used in future features
168        struct TestType {
169            id: String,
170            name: String,
171        }
172
173        let result = TypedJsonStream::<TestType>::deserialize_value(json);
174        match result {
175            Err(WireError::Deserialization { type_name, details }) => {
176                assert!(type_name.contains("TestType"));
177                assert!(details.contains("name"));
178            }
179            other => panic!("expected Deserialization error for missing field, got: {other:?}"),
180        }
181    }
182
183    #[test]
184    fn test_deserialize_type_mismatch() {
185        let json = serde_json::json!({
186            "id": "123",
187            "count": "not a number"  // should be i32
188        });
189
190        #[derive(Debug, serde::Deserialize)]
191        // Reason: test fixture struct used only for deserialization verification
192        #[allow(dead_code)] // Reason: field kept for API completeness; may be used in future features
193        struct TestType {
194            id: String,
195            count: i32,
196        }
197
198        let result = TypedJsonStream::<TestType>::deserialize_value(json);
199        match result {
200            Err(WireError::Deserialization { type_name, details }) => {
201                assert!(type_name.contains("TestType"));
202                assert!(details.contains("invalid") || details.contains("type"));
203            }
204            other => panic!("expected Deserialization error for type mismatch, got: {other:?}"),
205        }
206    }
207
208    #[test]
209    fn test_deserialize_value_type() {
210        let json = serde_json::json!({
211            "id": "123",
212            "name": "Test"
213        });
214
215        // Test that Value (escape hatch) works
216        let result = TypedJsonStream::<serde_json::Value>::deserialize_value(json.clone());
217        let value =
218            result.unwrap_or_else(|e| panic!("expected Ok for Value escape hatch, got: {e}"));
219        assert_eq!(value, json);
220    }
221
222    #[test]
223    fn test_phantom_data_has_no_size() {
224        use std::mem::size_of;
225
226        // Verify PhantomData adds zero size
227        let size_without_phantom = size_of::<Box<dyn Stream<Item = Result<Value>> + Unpin>>();
228        let size_with_phantom = size_of::<TypedJsonStream<serde_json::Value>>();
229
230        // PhantomData should not increase size
231        // (might be equal or slightly different due to alignment, but not significantly larger)
232        assert!(
233            size_with_phantom <= size_without_phantom + 8,
234            "PhantomData added too much size: {} vs {}",
235            size_with_phantom,
236            size_without_phantom
237        );
238    }
239}