fraiseql_wire/client/query_builder.rs
1//! Query builder API
2//!
3//! Generic query builder that supports automatic JSON deserialization to target types.
4//!
5//! **IMPORTANT**: Type T is **consumer-side only**.
6//!
7//! Type T does NOT affect:
8//! - SQL generation (always `SELECT data FROM {entity}`)
9//! - Filtering (`where_sql`, `where_rust`, `order_by`)
10//! - Wire protocol (identical for all T)
11//!
12//! Type T ONLY affects:
13//! - Consumer-side deserialization at `poll_next()`
14//! - Error messages (type name included)
15
16use crate::client::FraiseClient;
17use crate::stream::QueryStream;
18use crate::Result;
19use serde::de::DeserializeOwned;
20use serde_json::Value;
21use std::marker::PhantomData;
22
23/// Type alias for a Rust-side predicate function
24type RustPredicate = Box<dyn Fn(&Value) -> bool + Send>;
25
26/// Generic query builder
27///
28/// The type parameter T controls consumer-side deserialization only.
29/// Default type T = `serde_json::Value` for backward compatibility.
30///
31/// # Examples
32///
33/// Type-safe query (recommended):
34/// ```no_run
35/// // Requires: live Postgres connection via FraiseClient.
36/// use serde::Deserialize;
37///
38/// #[derive(Deserialize)]
39/// struct Project {
40/// id: String,
41/// name: String,
42/// }
43/// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
44/// let stream = client.query::<Project>("projects")
45/// .where_sql("status='active'")
46/// .execute()
47/// .await?;
48/// # Ok(())
49/// # }
50/// ```
51///
52/// Raw JSON query (debugging, forward compatibility):
53/// ```no_run
54/// // Requires: live Postgres connection via FraiseClient.
55/// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
56/// let stream = client.query::<serde_json::Value>("projects")
57/// .execute()
58/// .await?;
59/// # Ok(())
60/// # }
61/// ```
62pub struct QueryBuilder<T: DeserializeOwned + Unpin + 'static = serde_json::Value> {
63 client: FraiseClient,
64 entity: String,
65 sql_predicates: Vec<String>,
66 rust_predicate: Option<RustPredicate>,
67 order_by: Option<String>,
68 limit: Option<usize>,
69 offset: Option<usize>,
70 chunk_size: usize,
71 max_memory: Option<usize>,
72 soft_limit_warn_threshold: Option<f32>, // Percentage (0.0-1.0) at which to warn
73 soft_limit_fail_threshold: Option<f32>, // Percentage (0.0-1.0) at which to error
74 enable_adaptive_chunking: bool,
75 adaptive_min_chunk_size: Option<usize>,
76 adaptive_max_chunk_size: Option<usize>,
77 custom_select: Option<String>, // Optional custom SELECT clause for SQL projection
78 _phantom: PhantomData<T>,
79}
80
81impl<T: DeserializeOwned + Unpin + 'static> QueryBuilder<T> {
82 /// Create new query builder
83 pub(crate) fn new(client: FraiseClient, entity: impl Into<String>) -> Self {
84 Self {
85 client,
86 entity: entity.into(),
87 sql_predicates: Vec::new(),
88 rust_predicate: None,
89 order_by: None,
90 limit: None,
91 offset: None,
92 chunk_size: 256,
93 max_memory: None,
94 soft_limit_warn_threshold: None,
95 soft_limit_fail_threshold: None,
96 enable_adaptive_chunking: true, // Enabled by default
97 adaptive_min_chunk_size: None,
98 adaptive_max_chunk_size: None,
99 custom_select: None,
100 _phantom: PhantomData,
101 }
102 }
103
104 /// Add SQL WHERE clause predicate
105 ///
106 /// Type T does NOT affect SQL generation.
107 /// Multiple predicates are AND'ed together.
108 pub fn where_sql(mut self, predicate: impl Into<String>) -> Self {
109 self.sql_predicates.push(predicate.into());
110 self
111 }
112
113 /// Add Rust-side predicate
114 ///
115 /// Type T does NOT affect filtering.
116 /// Applied after SQL filtering, runs on streamed JSON values.
117 /// Predicates receive &`serde_json::Value` regardless of T.
118 pub fn where_rust<F>(mut self, predicate: F) -> Self
119 where
120 F: Fn(&Value) -> bool + Send + 'static,
121 {
122 self.rust_predicate = Some(Box::new(predicate));
123 self
124 }
125
126 /// Set ORDER BY clause
127 ///
128 /// Type T does NOT affect ordering.
129 pub fn order_by(mut self, order: impl Into<String>) -> Self {
130 self.order_by = Some(order.into());
131 self
132 }
133
134 /// Set a custom SELECT clause for SQL projection optimization
135 ///
136 /// When provided, this replaces the default `SELECT data` with a projection SQL
137 /// that filters fields at the database level, reducing network payload.
138 ///
139 /// The projection SQL will be wrapped as `SELECT {projection_sql} as data` to maintain
140 /// the hard invariant of a single `data` column.
141 ///
142 /// This feature enables architectural consistency with PostgreSQL optimization
143 /// and prepares for future performance improvements.
144 ///
145 /// # Arguments
146 ///
147 /// * `projection_sql` - PostgreSQL expression, typically from `jsonb_build_object()`
148 ///
149 /// # Example
150 ///
151 /// ```no_run
152 /// // Requires: live Postgres connection via FraiseClient.
153 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
154 /// # use serde::Deserialize;
155 /// # #[derive(Deserialize)] struct Project { id: String, name: String }
156 /// let stream = client
157 /// .query::<Project>("projects")
158 /// .select_projection("jsonb_build_object('id', data->>'id', 'name', data->>'name')")
159 /// .execute()
160 /// .await?;
161 /// # Ok(())
162 /// # }
163 /// ```
164 ///
165 /// # Backward Compatibility
166 ///
167 /// If not specified, defaults to `SELECT data` (original behavior).
168 pub fn select_projection(mut self, projection_sql: impl Into<String>) -> Self {
169 self.custom_select = Some(projection_sql.into());
170 self
171 }
172
173 /// Set LIMIT clause to restrict result set size
174 ///
175 /// # Example
176 ///
177 /// ```no_run
178 /// // Requires: live Postgres connection via FraiseClient.
179 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
180 /// # use serde::Deserialize;
181 /// # #[derive(Deserialize)] struct Project { id: String }
182 /// let stream = client.query::<Project>("projects")
183 /// .limit(10)
184 /// .execute()
185 /// .await?;
186 /// # Ok(())
187 /// # }
188 /// ```
189 pub const fn limit(mut self, count: usize) -> Self {
190 self.limit = Some(count);
191 self
192 }
193
194 /// Set OFFSET clause to skip first N rows
195 ///
196 /// # Example
197 ///
198 /// ```no_run
199 /// // Requires: live Postgres connection via FraiseClient.
200 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
201 /// # use serde::Deserialize;
202 /// # #[derive(Deserialize)] struct Project { id: String }
203 /// let stream = client.query::<Project>("projects")
204 /// .limit(10)
205 /// .offset(20) // Skip first 20, return next 10
206 /// .execute()
207 /// .await?;
208 /// # Ok(())
209 /// # }
210 /// ```
211 pub const fn offset(mut self, count: usize) -> Self {
212 self.offset = Some(count);
213 self
214 }
215
216 /// Set chunk size (default: 256)
217 pub const fn chunk_size(mut self, size: usize) -> Self {
218 self.chunk_size = size;
219 self
220 }
221
222 /// Set maximum memory limit for buffered items (default: unbounded)
223 ///
224 /// When the estimated memory usage of buffered items exceeds this limit,
225 /// the stream will return `Error::MemoryLimitExceeded` instead of additional items.
226 ///
227 /// Memory is estimated as: `items_buffered * 2048 bytes` (conservative for typical JSON).
228 ///
229 /// By default, `max_memory()` is None (unbounded), maintaining backward compatibility.
230 /// Only set if you need hard memory bounds.
231 ///
232 /// # Example
233 ///
234 /// ```no_run
235 /// // Requires: live Postgres connection via FraiseClient.
236 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
237 /// # use serde::Deserialize;
238 /// # #[derive(Deserialize)] struct Project { id: String }
239 /// let stream = client
240 /// .query::<Project>("projects")
241 /// .max_memory(500_000_000) // 500 MB limit
242 /// .execute()
243 /// .await?;
244 /// # Ok(())
245 /// # }
246 /// ```
247 ///
248 /// # Interpretation
249 ///
250 /// If memory limit is exceeded:
251 /// - It indicates the consumer is too slow relative to data arrival
252 /// - The error is terminal (non-retriable) — retrying won't help
253 /// - Consider: increasing consumer throughput, reducing `chunk_size`, or removing limit
254 pub const fn max_memory(mut self, bytes: usize) -> Self {
255 self.max_memory = Some(bytes);
256 self
257 }
258
259 /// Set soft memory limit thresholds for progressive degradation
260 ///
261 /// Allows warning at a threshold before hitting hard limit.
262 /// Only applies if `max_memory()` is also set.
263 ///
264 /// # Parameters
265 ///
266 /// - `warn_threshold`: Percentage (0.0-1.0) at which to emit a warning
267 /// - `fail_threshold`: Percentage (0.0-1.0) at which to return error (must be > `warn_threshold`)
268 ///
269 /// # Example
270 ///
271 /// ```no_run
272 /// // Requires: live Postgres connection via FraiseClient.
273 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
274 /// # use serde::Deserialize;
275 /// # #[derive(Deserialize)] struct Project { id: String }
276 /// let stream = client
277 /// .query::<Project>("projects")
278 /// .max_memory(500_000_000) // 500 MB hard limit
279 /// .memory_soft_limits(0.80, 1.0) // Warn at 80%, error at 100%
280 /// .execute()
281 /// .await?;
282 /// # Ok(())
283 /// # }
284 /// ```
285 ///
286 /// If only hard limit needed, skip this and just use `max_memory()`.
287 pub fn memory_soft_limits(mut self, warn_threshold: f32, fail_threshold: f32) -> Self {
288 // Validate thresholds
289 let warn = warn_threshold.clamp(0.0, 1.0);
290 let fail = fail_threshold.clamp(0.0, 1.0);
291
292 if warn < fail {
293 self.soft_limit_warn_threshold = Some(warn);
294 self.soft_limit_fail_threshold = Some(fail);
295 }
296 self
297 }
298
299 /// Enable or disable adaptive chunk sizing (default: enabled)
300 ///
301 /// Adaptive chunking automatically adjusts `chunk_size` based on channel occupancy:
302 /// - High occupancy (>80%): Decreases chunk size to reduce producer pressure
303 /// - Low occupancy (<20%): Increases chunk size to optimize batching efficiency
304 ///
305 /// Enabled by default for zero-configuration self-tuning.
306 /// Disable if you need fixed chunk sizes or encounter unexpected behavior.
307 ///
308 /// # Example
309 ///
310 /// ```no_run
311 /// // Requires: live Postgres connection via FraiseClient.
312 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
313 /// # use serde::Deserialize;
314 /// # #[derive(Deserialize)] struct Project { id: String }
315 /// let stream = client
316 /// .query::<Project>("projects")
317 /// .adaptive_chunking(false) // Disable adaptive tuning
318 /// .chunk_size(512) // Use fixed size
319 /// .execute()
320 /// .await?;
321 /// # Ok(())
322 /// # }
323 /// ```
324 pub const fn adaptive_chunking(mut self, enabled: bool) -> Self {
325 self.enable_adaptive_chunking = enabled;
326 self
327 }
328
329 /// Override minimum chunk size for adaptive tuning (default: 16)
330 ///
331 /// Adaptive chunking will never decrease chunk size below this value.
332 /// Useful if you need minimum batching for performance.
333 ///
334 /// Only applies if adaptive chunking is enabled.
335 ///
336 /// # Example
337 ///
338 /// ```no_run
339 /// // Requires: live Postgres connection via FraiseClient.
340 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
341 /// # use serde::Deserialize;
342 /// # #[derive(Deserialize)] struct Project { id: String }
343 /// let stream = client
344 /// .query::<Project>("projects")
345 /// .adaptive_chunking(true)
346 /// .adaptive_min_size(32) // Don't go below 32 items per batch
347 /// .execute()
348 /// .await?;
349 /// # Ok(())
350 /// # }
351 /// ```
352 pub const fn adaptive_min_size(mut self, size: usize) -> Self {
353 self.adaptive_min_chunk_size = Some(size);
354 self
355 }
356
357 /// Override maximum chunk size for adaptive tuning (default: 1024)
358 ///
359 /// Adaptive chunking will never increase chunk size above this value.
360 /// Useful if you need memory bounds or latency guarantees.
361 ///
362 /// Only applies if adaptive chunking is enabled.
363 ///
364 /// # Example
365 ///
366 /// ```no_run
367 /// // Requires: live Postgres connection via FraiseClient.
368 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
369 /// # use serde::Deserialize;
370 /// # #[derive(Deserialize)] struct Project { id: String }
371 /// let stream = client
372 /// .query::<Project>("projects")
373 /// .adaptive_chunking(true)
374 /// .adaptive_max_size(512) // Cap at 512 items per batch
375 /// .execute()
376 /// .await?;
377 /// # Ok(())
378 /// # }
379 /// ```
380 pub const fn adaptive_max_size(mut self, size: usize) -> Self {
381 self.adaptive_max_chunk_size = Some(size);
382 self
383 }
384
385 /// Execute query and return typed stream
386 ///
387 /// Type T ONLY affects consumer-side deserialization at `poll_next()`.
388 /// SQL, filtering, ordering, and wire protocol are identical regardless of T.
389 ///
390 /// The returned stream supports pause/resume/stats for advanced stream control.
391 ///
392 /// # Examples
393 ///
394 /// With type-safe deserialization:
395 /// ```no_run
396 /// // Requires: live Postgres connection via FraiseClient.
397 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
398 /// # use serde::Deserialize;
399 /// # #[derive(Deserialize)] struct Project { id: String }
400 /// # use futures::stream::StreamExt;
401 /// let mut stream = client.query::<Project>("projects").execute().await?;
402 /// while let Some(result) = stream.next().await {
403 /// let project: Project = result?;
404 /// }
405 /// # Ok(())
406 /// # }
407 /// ```
408 ///
409 /// With raw JSON (escape hatch):
410 /// ```no_run
411 /// // Requires: live Postgres connection via FraiseClient.
412 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
413 /// # use futures::stream::StreamExt;
414 /// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
415 /// while let Some(result) = stream.next().await {
416 /// let json: serde_json::Value = result?;
417 /// }
418 /// # Ok(())
419 /// # }
420 /// ```
421 ///
422 /// With stream control:
423 /// ```no_run
424 /// // Requires: live Postgres connection via FraiseClient.
425 /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
426 /// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
427 /// stream.pause().await?; // Pause the stream
428 /// let stats = stream.stats(); // Get statistics
429 /// stream.resume().await?; // Resume the stream
430 /// # Ok(())
431 /// # }
432 /// ```
433 ///
434 /// # Errors
435 ///
436 /// Returns `Error::InvalidSchema` if the SQL query cannot be built from the configured predicates.
437 /// Returns `Error::Io` or `Error::Protocol` if the streaming query fails to start.
438 /// Returns `Error::Sql` if the database rejects the query.
439 pub async fn execute(self) -> Result<QueryStream<T>> {
440 let sql = self.build_sql()?;
441 tracing::debug!("executing query: {}", sql);
442
443 // Record query submission metrics
444 crate::metrics::counters::query_submitted(
445 &self.entity,
446 !self.sql_predicates.is_empty(),
447 self.rust_predicate.is_some(),
448 self.order_by.is_some(),
449 );
450
451 let stream = self
452 .client
453 .execute_query(
454 &sql,
455 self.chunk_size,
456 self.max_memory,
457 self.soft_limit_warn_threshold,
458 self.soft_limit_fail_threshold,
459 )
460 .await?;
461
462 // Create QueryStream with optional Rust predicate
463 Ok(QueryStream::new(stream, self.rust_predicate))
464 }
465
466 /// Build SQL query
467 fn build_sql(&self) -> Result<String> {
468 // Use custom SELECT clause if provided, otherwise default to "SELECT data"
469 let select_clause = if let Some(ref projection) = self.custom_select {
470 format!("SELECT {} as data", projection)
471 } else {
472 "SELECT data".to_string()
473 };
474
475 // SAFETY: self.entity is schema-derived (from CompiledSchema, validated at compile
476 // time), not user input.
477 let mut sql = format!("{} FROM {}", select_clause, self.entity);
478
479 if !self.sql_predicates.is_empty() {
480 sql.push_str(" WHERE ");
481 sql.push_str(&self.sql_predicates.join(" AND "));
482 }
483
484 if let Some(ref order) = self.order_by {
485 sql.push_str(" ORDER BY ");
486 sql.push_str(order);
487 }
488
489 if let Some(limit) = self.limit {
490 sql.push_str(&format!(" LIMIT {}", limit));
491 }
492
493 if let Some(offset) = self.offset {
494 sql.push_str(&format!(" OFFSET {}", offset));
495 }
496
497 Ok(sql)
498 }
499}
500
501#[cfg(test)]
502mod tests {
503
504 fn build_test_sql(entity: &str, predicates: Vec<&str>, order_by: Option<&str>) -> String {
505 let mut sql = format!("SELECT data FROM {}", entity);
506 if !predicates.is_empty() {
507 sql.push_str(" WHERE ");
508 sql.push_str(&predicates.join(" AND "));
509 }
510 if let Some(order) = order_by {
511 sql.push_str(" ORDER BY ");
512 sql.push_str(order);
513 }
514 sql
515 }
516
517 #[test]
518 fn test_build_sql_simple() {
519 let sql = build_test_sql("user", vec![], None);
520 assert_eq!(sql, "SELECT data FROM user");
521 }
522
523 #[test]
524 fn test_build_sql_with_where() {
525 let sql = build_test_sql("user", vec!["data->>'status' = 'active'"], None);
526 assert_eq!(
527 sql,
528 "SELECT data FROM user WHERE data->>'status' = 'active'"
529 );
530 }
531
532 #[test]
533 fn test_build_sql_with_order() {
534 let sql = build_test_sql("user", vec![], Some("data->>'name' ASC"));
535 assert_eq!(sql, "SELECT data FROM user ORDER BY data->>'name' ASC");
536 }
537
538 #[test]
539 fn test_build_sql_with_limit() {
540 let mut sql = "SELECT data FROM user".to_string();
541 sql.push_str(" LIMIT 10");
542 assert_eq!(sql, "SELECT data FROM user LIMIT 10");
543 }
544
545 #[test]
546 fn test_build_sql_with_offset() {
547 let mut sql = "SELECT data FROM user".to_string();
548 sql.push_str(" OFFSET 20");
549 assert_eq!(sql, "SELECT data FROM user OFFSET 20");
550 }
551
552 #[test]
553 fn test_build_sql_with_limit_and_offset() {
554 let mut sql = "SELECT data FROM user".to_string();
555 sql.push_str(" LIMIT 10");
556 sql.push_str(" OFFSET 20");
557 assert_eq!(sql, "SELECT data FROM user LIMIT 10 OFFSET 20");
558 }
559
560 #[test]
561 fn test_build_sql_complete() {
562 let mut sql = "SELECT data FROM user".to_string();
563 sql.push_str(" WHERE data->>'status' = 'active'");
564 sql.push_str(" ORDER BY data->>'name' ASC");
565 sql.push_str(" LIMIT 10");
566 sql.push_str(" OFFSET 20");
567 assert_eq!(
568 sql,
569 "SELECT data FROM user WHERE data->>'status' = 'active' ORDER BY data->>'name' ASC LIMIT 10 OFFSET 20"
570 );
571 }
572
573 // Projection tests
574 #[test]
575 fn test_build_sql_default_select() {
576 let sql = build_test_sql("users", vec![], None);
577 assert!(sql.starts_with("SELECT data FROM"));
578 assert_eq!(sql, "SELECT data FROM users");
579 }
580
581 #[test]
582 fn test_projection_single_field() {
583 let sql = "SELECT jsonb_build_object('id', data->>'id') as data FROM users".to_string();
584 assert!(sql.contains("as data"));
585 assert!(sql.starts_with("SELECT jsonb_build_object("));
586 assert!(sql.contains("FROM users"));
587 }
588
589 #[test]
590 fn test_projection_multiple_fields() {
591 let projection =
592 "jsonb_build_object('id', data->>'id', 'name', data->>'name', 'email', data->>'email')";
593 let sql = format!("SELECT {} as data FROM users", projection);
594 assert!(sql.contains("as data FROM users"));
595 assert!(sql.contains("jsonb_build_object("));
596 assert!(sql.contains("'id'"));
597 assert!(sql.contains("'name'"));
598 assert!(sql.contains("'email'"));
599 }
600
601 #[test]
602 fn test_projection_with_where_clause() {
603 let projection = "jsonb_build_object('id', data->>'id')";
604 let mut sql = format!("SELECT {} as data FROM users", projection);
605 sql.push_str(" WHERE data->>'status' = 'active'");
606 assert!(sql.contains("SELECT jsonb_build_object("));
607 assert!(sql.contains("as data FROM users"));
608 assert!(sql.contains("WHERE data->>'status' = 'active'"));
609 }
610
611 #[test]
612 fn test_projection_with_order_by() {
613 let projection = "jsonb_build_object('id', data->>'id')";
614 let mut sql = format!("SELECT {} as data FROM users", projection);
615 sql.push_str(" ORDER BY data->>'name' ASC");
616 assert!(sql.contains("SELECT jsonb_build_object("));
617 assert!(sql.contains("ORDER BY data->>'name' ASC"));
618 }
619
620 #[test]
621 fn test_projection_with_limit() {
622 let projection = "jsonb_build_object('id', data->>'id')";
623 let mut sql = format!("SELECT {} as data FROM users", projection);
624 sql.push_str(" LIMIT 1000");
625 assert!(sql.contains("as data FROM users"));
626 assert!(sql.contains("LIMIT 1000"));
627 }
628
629 #[test]
630 fn test_projection_with_offset() {
631 let projection = "jsonb_build_object('id', data->>'id')";
632 let mut sql = format!("SELECT {} as data FROM users", projection);
633 sql.push_str(" OFFSET 500");
634 assert!(sql.contains("as data FROM users"));
635 assert!(sql.contains("OFFSET 500"));
636 }
637
638 #[test]
639 fn test_projection_full_pipeline() {
640 let projection =
641 "jsonb_build_object('user_id', data->>'user_id', 'event_type', data->>'event_type')";
642 let mut sql = format!("SELECT {} as data FROM events", projection);
643 sql.push_str(" WHERE event_type IN ('purchase', 'view')");
644 sql.push_str(" ORDER BY timestamp DESC");
645 sql.push_str(" LIMIT 5000");
646 assert!(sql.contains("SELECT jsonb_build_object("));
647 assert!(sql.contains("'user_id'"));
648 assert!(sql.contains("'event_type'"));
649 assert!(sql.contains("as data FROM events"));
650 assert!(sql.contains("WHERE event_type IN ('purchase', 'view')"));
651 assert!(sql.contains("ORDER BY timestamp DESC"));
652 assert!(sql.contains("LIMIT 5000"));
653 }
654
655 // Stream pipeline integration tests
656 #[test]
657 fn test_typed_stream_with_value_type() {
658 // Verify that TypedJsonStream can wrap a raw JSON stream
659 use crate::stream::TypedJsonStream;
660 use futures::stream;
661
662 let values = vec![
663 Ok(serde_json::json!({"id": "1", "name": "Alice"})),
664 Ok(serde_json::json!({"id": "2", "name": "Bob"})),
665 ];
666
667 let json_stream = stream::iter(values);
668 let typed_stream: TypedJsonStream<serde_json::Value> =
669 TypedJsonStream::new(Box::new(json_stream));
670
671 // This verifies the stream compiles and has correct type
672 let _stream: Box<
673 dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
674 > = Box::new(typed_stream);
675 }
676
677 #[test]
678 fn test_filtered_stream_with_typed_output() {
679 // Verify that FilteredStream correctly filters before TypedJsonStream
680 use crate::stream::{FilteredStream, TypedJsonStream};
681 use futures::stream;
682
683 let values = vec![
684 Ok(serde_json::json!({"id": 1, "active": true})),
685 Ok(serde_json::json!({"id": 2, "active": false})),
686 Ok(serde_json::json!({"id": 3, "active": true})),
687 ];
688
689 let json_stream = stream::iter(values);
690 let predicate = Box::new(|v: &serde_json::Value| v["active"].as_bool().unwrap_or(false));
691
692 let filtered = FilteredStream::new(json_stream, predicate);
693 let typed_stream: TypedJsonStream<serde_json::Value> =
694 TypedJsonStream::new(Box::new(filtered));
695
696 // This verifies the full pipeline compiles
697 let _stream: Box<
698 dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
699 > = Box::new(typed_stream);
700 }
701
702 #[test]
703 fn test_stream_pipeline_type_flow() {
704 // Comprehensive test of stream type compatibility:
705 // JsonStream (Result<Value>) → FilteredStream (Result<Value>) → TypedJsonStream<T> (Result<T>)
706 use crate::stream::{FilteredStream, TypedJsonStream};
707 use futures::stream;
708 use serde::Deserialize;
709
710 #[derive(Deserialize, Debug)]
711 #[allow(dead_code)] // Reason: test fixture struct; fields read only by serde deserialization verification
712 struct TestUser {
713 id: String,
714 active: bool,
715 }
716
717 let values = vec![
718 Ok(serde_json::json!({"id": "1", "active": true})),
719 Ok(serde_json::json!({"id": "2", "active": false})),
720 ];
721
722 let json_stream = stream::iter(values);
723
724 // Step 1: FilteredStream filters JSON values
725 let predicate: Box<dyn Fn(&serde_json::Value) -> bool + Send> =
726 Box::new(|v| v["active"].as_bool().unwrap_or(false));
727 let filtered: Box<
728 dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Send + Unpin,
729 > = Box::new(FilteredStream::new(json_stream, predicate));
730
731 // Step 2: TypedJsonStream deserializes to TestUser
732 let typed: TypedJsonStream<TestUser> = TypedJsonStream::new(filtered);
733
734 // This verifies type system is compatible:
735 // - FilteredStream outputs Result<Value>
736 // - TypedJsonStream<T> takes Box<dyn Stream<Item = Result<Value>>>
737 // - TypedJsonStream<T> outputs Result<T>
738 let _final_stream: Box<
739 dyn futures::stream::Stream<Item = crate::Result<TestUser>> + Unpin,
740 > = Box::new(typed);
741 }
742}