fraiseql_wire/stream/
typed_stream.rs1use crate::{Error, Result};
8use futures::stream::Stream;
9use futures::StreamExt;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pub struct TypedJsonStream<T: DeserializeOwned> {
58 inner: Box<dyn Stream<Item = Result<Value>> + Send + Unpin>,
64 _phantom: PhantomData<T>,
66}
67
68impl<T: DeserializeOwned> TypedJsonStream<T> {
69 pub fn new(inner: Box<dyn Stream<Item = Result<Value>> + Send + Unpin>) -> Self {
71 Self {
72 inner,
73 _phantom: PhantomData,
74 }
75 }
76
77 fn deserialize_value(value: Value) -> Result<T> {
82 let type_name = std::any::type_name::<T>().to_string();
83 let deser_start = std::time::Instant::now();
84
85 match serde_json::from_value::<T>(value) {
86 Ok(result) => {
87 let duration_ms = deser_start.elapsed().as_millis() as u64;
88 crate::metrics::histograms::deserialization_duration(
89 "unknown",
90 &type_name,
91 duration_ms,
92 );
93 crate::metrics::counters::deserialization_success("unknown", &type_name);
94 Ok(result)
95 }
96 Err(e) => {
97 crate::metrics::counters::deserialization_failure(
98 "unknown",
99 &type_name,
100 "serde_error",
101 );
102 Err(Error::Deserialization {
103 type_name,
104 details: e.to_string(),
105 })
106 }
107 }
108 }
109}
110
111impl<T: DeserializeOwned + Unpin> Stream for TypedJsonStream<T> {
112 type Item = Result<T>;
113
114 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115 match self.inner.poll_next_unpin(cx) {
116 Poll::Ready(Some(Ok(value))) => {
117 Poll::Ready(Some(Self::deserialize_value(value)))
120 }
121 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
122 Poll::Ready(None) => Poll::Ready(None),
123 Poll::Pending => Poll::Pending,
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 #![allow(clippy::unwrap_used)] use super::*;
132
133 #[test]
134 fn test_typed_stream_creation() {
135 let _stream: TypedJsonStream<serde_json::Value> =
137 TypedJsonStream::new(Box::new(futures::stream::empty()));
138
139 #[derive(serde::Deserialize, Debug)]
140 #[allow(dead_code)] struct TestType {
142 id: String,
143 }
144
145 let _stream: TypedJsonStream<TestType> =
146 TypedJsonStream::new(Box::new(futures::stream::empty()));
147 }
148
149 #[test]
150 fn test_deserialize_valid_value() {
151 let json = serde_json::json!({
152 "id": "123",
153 "name": "Test"
154 });
155
156 #[derive(serde::Deserialize)]
157 #[allow(dead_code)] struct TestType {
159 id: String,
160 name: String,
161 }
162
163 let result = TypedJsonStream::<TestType>::deserialize_value(json);
164 assert!(result.is_ok());
165 let item = result.unwrap();
166 assert_eq!(item.id, "123");
167 assert_eq!(item.name, "Test");
168 }
169
170 #[test]
171 fn test_deserialize_missing_field() {
172 let json = serde_json::json!({
173 "id": "123"
174 });
176
177 #[derive(Debug, serde::Deserialize)]
178 #[allow(dead_code)] struct TestType {
180 id: String,
181 name: String,
182 }
183
184 let result = TypedJsonStream::<TestType>::deserialize_value(json);
185 assert!(result.is_err());
186
187 let err = result.unwrap_err();
188 match err {
189 Error::Deserialization { type_name, details } => {
190 assert!(type_name.contains("TestType"));
191 assert!(details.contains("name"));
192 }
193 _ => panic!("Expected Deserialization error"),
194 }
195 }
196
197 #[test]
198 fn test_deserialize_type_mismatch() {
199 let json = serde_json::json!({
200 "id": "123",
201 "count": "not a number" });
203
204 #[derive(Debug, serde::Deserialize)]
205 #[allow(dead_code)] struct TestType {
207 id: String,
208 count: i32,
209 }
210
211 let result = TypedJsonStream::<TestType>::deserialize_value(json);
212 assert!(result.is_err());
213
214 let err = result.unwrap_err();
215 match err {
216 Error::Deserialization { type_name, details } => {
217 assert!(type_name.contains("TestType"));
218 assert!(details.contains("invalid") || details.contains("type"));
219 }
220 _ => panic!("Expected Deserialization error"),
221 }
222 }
223
224 #[test]
225 fn test_deserialize_value_type() {
226 let json = serde_json::json!({
227 "id": "123",
228 "name": "Test"
229 });
230
231 let result = TypedJsonStream::<serde_json::Value>::deserialize_value(json.clone());
233 assert!(result.is_ok());
234 assert_eq!(result.unwrap(), json);
235 }
236
237 #[test]
238 fn test_phantom_data_has_no_size() {
239 use std::mem::size_of;
240
241 let size_without_phantom = size_of::<Box<dyn Stream<Item = Result<Value>> + Unpin>>();
243 let size_with_phantom = size_of::<TypedJsonStream<serde_json::Value>>();
244
245 assert!(
248 size_with_phantom <= size_without_phantom + 8,
249 "PhantomData added too much size: {} vs {}",
250 size_with_phantom,
251 size_without_phantom
252 );
253 }
254}