fraiseql_wire/stream/
typed_stream.rs1use crate::{Error, Result};
8use futures::stream::Stream;
9use futures::StreamExt;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pub struct TypedJsonStream<T: DeserializeOwned> {
51 inner: Box<dyn Stream<Item = Result<Value>> + Unpin>,
53 _phantom: PhantomData<T>,
55}
56
57impl<T: DeserializeOwned> TypedJsonStream<T> {
58 pub fn new(inner: Box<dyn Stream<Item = Result<Value>> + Unpin>) -> Self {
60 Self {
61 inner,
62 _phantom: PhantomData,
63 }
64 }
65
66 fn deserialize_value(value: Value) -> Result<T> {
71 let type_name = std::any::type_name::<T>().to_string();
72 let deser_start = std::time::Instant::now();
73
74 match serde_json::from_value::<T>(value) {
75 Ok(result) => {
76 let duration_ms = deser_start.elapsed().as_millis() as u64;
77 crate::metrics::histograms::deserialization_duration(
78 "unknown",
79 &type_name,
80 duration_ms,
81 );
82 crate::metrics::counters::deserialization_success("unknown", &type_name);
83 Ok(result)
84 }
85 Err(e) => {
86 crate::metrics::counters::deserialization_failure(
87 "unknown",
88 &type_name,
89 "serde_error",
90 );
91 Err(Error::Deserialization {
92 type_name,
93 details: e.to_string(),
94 })
95 }
96 }
97 }
98}
99
100impl<T: DeserializeOwned + Unpin> Stream for TypedJsonStream<T> {
101 type Item = Result<T>;
102
103 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104 match self.inner.poll_next_unpin(cx) {
105 Poll::Ready(Some(Ok(value))) => {
106 Poll::Ready(Some(Self::deserialize_value(value)))
109 }
110 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
111 Poll::Ready(None) => Poll::Ready(None),
112 Poll::Pending => Poll::Pending,
113 }
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120
121 #[test]
122 fn test_typed_stream_creation() {
123 let _stream: TypedJsonStream<serde_json::Value> =
125 TypedJsonStream::new(Box::new(futures::stream::empty()));
126
127 #[derive(serde::Deserialize, Debug)]
128 #[allow(dead_code)]
129 struct TestType {
130 id: String,
131 }
132
133 let _stream: TypedJsonStream<TestType> =
134 TypedJsonStream::new(Box::new(futures::stream::empty()));
135 }
136
137 #[test]
138 fn test_deserialize_valid_value() {
139 let json = serde_json::json!({
140 "id": "123",
141 "name": "Test"
142 });
143
144 #[derive(serde::Deserialize)]
145 #[allow(dead_code)]
146 struct TestType {
147 id: String,
148 name: String,
149 }
150
151 let result = TypedJsonStream::<TestType>::deserialize_value(json);
152 assert!(result.is_ok());
153 let item = result.unwrap();
154 assert_eq!(item.id, "123");
155 assert_eq!(item.name, "Test");
156 }
157
158 #[test]
159 fn test_deserialize_missing_field() {
160 let json = serde_json::json!({
161 "id": "123"
162 });
164
165 #[derive(Debug, serde::Deserialize)]
166 #[allow(dead_code)]
167 struct TestType {
168 id: String,
169 name: String,
170 }
171
172 let result = TypedJsonStream::<TestType>::deserialize_value(json);
173 assert!(result.is_err());
174
175 let err = result.unwrap_err();
176 match err {
177 Error::Deserialization { type_name, details } => {
178 assert!(type_name.contains("TestType"));
179 assert!(details.contains("name"));
180 }
181 _ => panic!("Expected Deserialization error"),
182 }
183 }
184
185 #[test]
186 fn test_deserialize_type_mismatch() {
187 let json = serde_json::json!({
188 "id": "123",
189 "count": "not a number" });
191
192 #[derive(Debug, serde::Deserialize)]
193 #[allow(dead_code)]
194 struct TestType {
195 id: String,
196 count: i32,
197 }
198
199 let result = TypedJsonStream::<TestType>::deserialize_value(json);
200 assert!(result.is_err());
201
202 let err = result.unwrap_err();
203 match err {
204 Error::Deserialization { type_name, details } => {
205 assert!(type_name.contains("TestType"));
206 assert!(details.contains("invalid") || details.contains("type"));
207 }
208 _ => panic!("Expected Deserialization error"),
209 }
210 }
211
212 #[test]
213 fn test_deserialize_value_type() {
214 let json = serde_json::json!({
215 "id": "123",
216 "name": "Test"
217 });
218
219 let result = TypedJsonStream::<serde_json::Value>::deserialize_value(json.clone());
221 assert!(result.is_ok());
222 assert_eq!(result.unwrap(), json);
223 }
224
225 #[test]
226 fn test_phantom_data_has_no_size() {
227 use std::mem::size_of;
228
229 let size_without_phantom = size_of::<Box<dyn Stream<Item = Result<Value>> + Unpin>>();
231 let size_with_phantom = size_of::<TypedJsonStream<serde_json::Value>>();
232
233 assert!(
236 size_with_phantom <= size_without_phantom + 8,
237 "PhantomData added too much size: {} vs {}",
238 size_with_phantom,
239 size_without_phantom
240 );
241 }
242}