fraiseql_wire/stream/
typed_stream.rs1use crate::{Result, WireError};
8use futures::stream::Stream;
9use futures::StreamExt;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pub struct TypedJsonStream<T: DeserializeOwned> {
45 inner: Box<dyn Stream<Item = Result<Value>> + Send + Unpin>,
51 _phantom: PhantomData<T>,
53}
54
55impl<T: DeserializeOwned> TypedJsonStream<T> {
56 pub fn new(inner: Box<dyn Stream<Item = Result<Value>> + Send + Unpin>) -> Self {
58 Self {
59 inner,
60 _phantom: PhantomData,
61 }
62 }
63
64 fn deserialize_value(value: Value) -> Result<T> {
69 let type_name = std::any::type_name::<T>().to_string();
70 let deser_start = std::time::Instant::now();
71
72 match serde_json::from_value::<T>(value) {
73 Ok(result) => {
74 let duration_ms = deser_start.elapsed().as_millis() as u64;
75 crate::metrics::histograms::deserialization_duration(
76 "unknown",
77 &type_name,
78 duration_ms,
79 );
80 crate::metrics::counters::deserialization_success("unknown", &type_name);
81 Ok(result)
82 }
83 Err(e) => {
84 crate::metrics::counters::deserialization_failure(
85 "unknown",
86 &type_name,
87 "serde_error",
88 );
89 Err(WireError::Deserialization {
90 type_name,
91 details: e.to_string(),
92 })
93 }
94 }
95 }
96}
97
98impl<T: DeserializeOwned + Unpin> Stream for TypedJsonStream<T> {
99 type Item = Result<T>;
100
101 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102 match self.inner.poll_next_unpin(cx) {
103 Poll::Ready(Some(Ok(value))) => {
104 Poll::Ready(Some(Self::deserialize_value(value)))
107 }
108 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
109 Poll::Ready(None) => Poll::Ready(None),
110 Poll::Pending => Poll::Pending,
111 }
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 #![allow(clippy::unwrap_used)] use super::*;
119
120 #[test]
121 fn test_typed_stream_creation() {
122 let _stream: TypedJsonStream<serde_json::Value> =
124 TypedJsonStream::new(Box::new(futures::stream::empty()));
125
126 #[derive(serde::Deserialize, Debug)]
127 #[allow(dead_code)] struct TestType {
130 id: String,
131 }
132
133 let _stream: TypedJsonStream<TestType> =
134 TypedJsonStream::new(Box::new(futures::stream::empty()));
135 }
136
137 #[test]
138 fn test_deserialize_valid_value() {
139 let json = serde_json::json!({
140 "id": "123",
141 "name": "Test"
142 });
143
144 #[derive(serde::Deserialize)]
145 #[allow(dead_code)] struct TestType {
148 id: String,
149 name: String,
150 }
151
152 let result = TypedJsonStream::<TestType>::deserialize_value(json);
153 let item = result.unwrap_or_else(|e| panic!("expected Ok for valid JSON, got: {e}"));
154 assert_eq!(item.id, "123");
155 assert_eq!(item.name, "Test");
156 }
157
158 #[test]
159 fn test_deserialize_missing_field() {
160 let json = serde_json::json!({
161 "id": "123"
162 });
164
165 #[derive(Debug, serde::Deserialize)]
166 #[allow(dead_code)] struct TestType {
169 id: String,
170 name: String,
171 }
172
173 let result = TypedJsonStream::<TestType>::deserialize_value(json);
174 match result {
175 Err(WireError::Deserialization { type_name, details }) => {
176 assert!(type_name.contains("TestType"));
177 assert!(details.contains("name"));
178 }
179 other => panic!("expected Deserialization error for missing field, got: {other:?}"),
180 }
181 }
182
183 #[test]
184 fn test_deserialize_type_mismatch() {
185 let json = serde_json::json!({
186 "id": "123",
187 "count": "not a number" });
189
190 #[derive(Debug, serde::Deserialize)]
191 #[allow(dead_code)] struct TestType {
194 id: String,
195 count: i32,
196 }
197
198 let result = TypedJsonStream::<TestType>::deserialize_value(json);
199 match result {
200 Err(WireError::Deserialization { type_name, details }) => {
201 assert!(type_name.contains("TestType"));
202 assert!(details.contains("invalid") || details.contains("type"));
203 }
204 other => panic!("expected Deserialization error for type mismatch, got: {other:?}"),
205 }
206 }
207
208 #[test]
209 fn test_deserialize_value_type() {
210 let json = serde_json::json!({
211 "id": "123",
212 "name": "Test"
213 });
214
215 let result = TypedJsonStream::<serde_json::Value>::deserialize_value(json.clone());
217 let value =
218 result.unwrap_or_else(|e| panic!("expected Ok for Value escape hatch, got: {e}"));
219 assert_eq!(value, json);
220 }
221
222 #[test]
223 fn test_phantom_data_has_no_size() {
224 use std::mem::size_of;
225
226 let size_without_phantom = size_of::<Box<dyn Stream<Item = Result<Value>> + Unpin>>();
228 let size_with_phantom = size_of::<TypedJsonStream<serde_json::Value>>();
229
230 assert!(
233 size_with_phantom <= size_without_phantom + 8,
234 "PhantomData added too much size: {} vs {}",
235 size_with_phantom,
236 size_without_phantom
237 );
238 }
239}