Skip to main content

fraiseql_wire/client/
query_builder.rs

1//! Query builder API
2//!
3//! Generic query builder that supports automatic JSON deserialization to target types.
4//!
5//! **IMPORTANT**: Type T is **consumer-side only**.
6//!
7//! Type T does NOT affect:
8//! - SQL generation (always `SELECT data FROM {entity}`)
9//! - Filtering (`where_sql`, `where_rust`, `order_by`)
10//! - Wire protocol (identical for all T)
11//!
12//! Type T ONLY affects:
13//! - Consumer-side deserialization at `poll_next()`
14//! - Error messages (type name included)
15
16use crate::client::FraiseClient;
17use crate::stream::QueryStream;
18use crate::Result;
19use serde::de::DeserializeOwned;
20use serde_json::Value;
21use std::marker::PhantomData;
22
23/// Type alias for a Rust-side predicate function
24type RustPredicate = Box<dyn Fn(&Value) -> bool + Send>;
25
26/// Generic query builder
27///
28/// The type parameter T controls consumer-side deserialization only.
29/// Default type T = `serde_json::Value` for backward compatibility.
30///
31/// # Examples
32///
33/// Type-safe query (recommended):
34/// ```no_run
35/// // Requires: live Postgres connection via FraiseClient.
36/// use serde::Deserialize;
37///
38/// #[derive(Deserialize)]
39/// struct Project {
40///     id: String,
41///     name: String,
42/// }
43/// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
44/// let stream = client.query::<Project>("projects")
45///     .where_sql("status='active'")
46///     .execute()
47///     .await?;
48/// # Ok(())
49/// # }
50/// ```
51///
52/// Raw JSON query (debugging, forward compatibility):
53/// ```no_run
54/// // Requires: live Postgres connection via FraiseClient.
55/// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
56/// let stream = client.query::<serde_json::Value>("projects")
57///     .execute()
58///     .await?;
59/// # Ok(())
60/// # }
61/// ```
62pub struct QueryBuilder<T: DeserializeOwned + Unpin + 'static = serde_json::Value> {
63    client: FraiseClient,
64    entity: String,
65    sql_predicates: Vec<String>,
66    rust_predicate: Option<RustPredicate>,
67    order_by: Option<String>,
68    limit: Option<usize>,
69    offset: Option<usize>,
70    chunk_size: usize,
71    max_memory: Option<usize>,
72    soft_limit_warn_threshold: Option<f32>, // Percentage (0.0-1.0) at which to warn
73    soft_limit_fail_threshold: Option<f32>, // Percentage (0.0-1.0) at which to error
74    enable_adaptive_chunking: bool,
75    adaptive_min_chunk_size: Option<usize>,
76    adaptive_max_chunk_size: Option<usize>,
77    custom_select: Option<String>, // Optional custom SELECT clause for SQL projection
78    _phantom: PhantomData<T>,
79}
80
81impl<T: DeserializeOwned + Unpin + 'static> QueryBuilder<T> {
82    /// Create new query builder
83    pub(crate) fn new(client: FraiseClient, entity: impl Into<String>) -> Self {
84        Self {
85            client,
86            entity: entity.into(),
87            sql_predicates: Vec::new(),
88            rust_predicate: None,
89            order_by: None,
90            limit: None,
91            offset: None,
92            chunk_size: 256,
93            max_memory: None,
94            soft_limit_warn_threshold: None,
95            soft_limit_fail_threshold: None,
96            enable_adaptive_chunking: true, // Enabled by default
97            adaptive_min_chunk_size: None,
98            adaptive_max_chunk_size: None,
99            custom_select: None,
100            _phantom: PhantomData,
101        }
102    }
103
104    /// Add SQL WHERE clause predicate
105    ///
106    /// Type T does NOT affect SQL generation.
107    /// Multiple predicates are AND'ed together.
108    pub fn where_sql(mut self, predicate: impl Into<String>) -> Self {
109        self.sql_predicates.push(predicate.into());
110        self
111    }
112
113    /// Add Rust-side predicate
114    ///
115    /// Type T does NOT affect filtering.
116    /// Applied after SQL filtering, runs on streamed JSON values.
117    /// Predicates receive &`serde_json::Value` regardless of T.
118    pub fn where_rust<F>(mut self, predicate: F) -> Self
119    where
120        F: Fn(&Value) -> bool + Send + 'static,
121    {
122        self.rust_predicate = Some(Box::new(predicate));
123        self
124    }
125
126    /// Set ORDER BY clause
127    ///
128    /// Type T does NOT affect ordering.
129    pub fn order_by(mut self, order: impl Into<String>) -> Self {
130        self.order_by = Some(order.into());
131        self
132    }
133
134    /// Set a custom SELECT clause for SQL projection optimization
135    ///
136    /// When provided, this replaces the default `SELECT data` with a projection SQL
137    /// that filters fields at the database level, reducing network payload.
138    ///
139    /// The projection SQL will be wrapped as `SELECT {projection_sql} as data` to maintain
140    /// the hard invariant of a single `data` column.
141    ///
142    /// This feature enables architectural consistency with PostgreSQL optimization
143    /// and prepares for future performance improvements.
144    ///
145    /// # Arguments
146    ///
147    /// * `projection_sql` - PostgreSQL expression, typically from `jsonb_build_object()`
148    ///
149    /// # Example
150    ///
151    /// ```no_run
152    /// // Requires: live Postgres connection via FraiseClient.
153    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
154    /// # use serde::Deserialize;
155    /// # #[derive(Deserialize)] struct Project { id: String, name: String }
156    /// let stream = client
157    ///     .query::<Project>("projects")
158    ///     .select_projection("jsonb_build_object('id', data->>'id', 'name', data->>'name')")
159    ///     .execute()
160    ///     .await?;
161    /// # Ok(())
162    /// # }
163    /// ```
164    ///
165    /// # Backward Compatibility
166    ///
167    /// If not specified, defaults to `SELECT data` (original behavior).
168    pub fn select_projection(mut self, projection_sql: impl Into<String>) -> Self {
169        self.custom_select = Some(projection_sql.into());
170        self
171    }
172
173    /// Set LIMIT clause to restrict result set size
174    ///
175    /// # Example
176    ///
177    /// ```no_run
178    /// // Requires: live Postgres connection via FraiseClient.
179    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
180    /// # use serde::Deserialize;
181    /// # #[derive(Deserialize)] struct Project { id: String }
182    /// let stream = client.query::<Project>("projects")
183    ///     .limit(10)
184    ///     .execute()
185    ///     .await?;
186    /// # Ok(())
187    /// # }
188    /// ```
189    pub const fn limit(mut self, count: usize) -> Self {
190        self.limit = Some(count);
191        self
192    }
193
194    /// Set OFFSET clause to skip first N rows
195    ///
196    /// # Example
197    ///
198    /// ```no_run
199    /// // Requires: live Postgres connection via FraiseClient.
200    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
201    /// # use serde::Deserialize;
202    /// # #[derive(Deserialize)] struct Project { id: String }
203    /// let stream = client.query::<Project>("projects")
204    ///     .limit(10)
205    ///     .offset(20)  // Skip first 20, return next 10
206    ///     .execute()
207    ///     .await?;
208    /// # Ok(())
209    /// # }
210    /// ```
211    pub const fn offset(mut self, count: usize) -> Self {
212        self.offset = Some(count);
213        self
214    }
215
216    /// Set chunk size (default: 256)
217    pub const fn chunk_size(mut self, size: usize) -> Self {
218        self.chunk_size = size;
219        self
220    }
221
222    /// Set maximum memory limit for buffered items (default: unbounded)
223    ///
224    /// When the estimated memory usage of buffered items exceeds this limit,
225    /// the stream will return `Error::MemoryLimitExceeded` instead of additional items.
226    ///
227    /// Memory is estimated as: `items_buffered * 2048 bytes` (conservative for typical JSON).
228    ///
229    /// By default, `max_memory()` is None (unbounded), maintaining backward compatibility.
230    /// Only set if you need hard memory bounds.
231    ///
232    /// # Example
233    ///
234    /// ```no_run
235    /// // Requires: live Postgres connection via FraiseClient.
236    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
237    /// # use serde::Deserialize;
238    /// # #[derive(Deserialize)] struct Project { id: String }
239    /// let stream = client
240    ///     .query::<Project>("projects")
241    ///     .max_memory(500_000_000)  // 500 MB limit
242    ///     .execute()
243    ///     .await?;
244    /// # Ok(())
245    /// # }
246    /// ```
247    ///
248    /// # Interpretation
249    ///
250    /// If memory limit is exceeded:
251    /// - It indicates the consumer is too slow relative to data arrival
252    /// - The error is terminal (non-retriable) — retrying won't help
253    /// - Consider: increasing consumer throughput, reducing `chunk_size`, or removing limit
254    pub const fn max_memory(mut self, bytes: usize) -> Self {
255        self.max_memory = Some(bytes);
256        self
257    }
258
259    /// Set soft memory limit thresholds for progressive degradation
260    ///
261    /// Allows warning at a threshold before hitting hard limit.
262    /// Only applies if `max_memory()` is also set.
263    ///
264    /// # Parameters
265    ///
266    /// - `warn_threshold`: Percentage (0.0-1.0) at which to emit a warning
267    /// - `fail_threshold`: Percentage (0.0-1.0) at which to return error (must be > `warn_threshold`)
268    ///
269    /// # Example
270    ///
271    /// ```no_run
272    /// // Requires: live Postgres connection via FraiseClient.
273    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
274    /// # use serde::Deserialize;
275    /// # #[derive(Deserialize)] struct Project { id: String }
276    /// let stream = client
277    ///     .query::<Project>("projects")
278    ///     .max_memory(500_000_000)  // 500 MB hard limit
279    ///     .memory_soft_limits(0.80, 1.0)  // Warn at 80%, error at 100%
280    ///     .execute()
281    ///     .await?;
282    /// # Ok(())
283    /// # }
284    /// ```
285    ///
286    /// If only hard limit needed, skip this and just use `max_memory()`.
287    pub fn memory_soft_limits(mut self, warn_threshold: f32, fail_threshold: f32) -> Self {
288        // Validate thresholds
289        let warn = warn_threshold.clamp(0.0, 1.0);
290        let fail = fail_threshold.clamp(0.0, 1.0);
291
292        if warn < fail {
293            self.soft_limit_warn_threshold = Some(warn);
294            self.soft_limit_fail_threshold = Some(fail);
295        }
296        self
297    }
298
299    /// Enable or disable adaptive chunk sizing (default: enabled)
300    ///
301    /// Adaptive chunking automatically adjusts `chunk_size` based on channel occupancy:
302    /// - High occupancy (>80%): Decreases chunk size to reduce producer pressure
303    /// - Low occupancy (<20%): Increases chunk size to optimize batching efficiency
304    ///
305    /// Enabled by default for zero-configuration self-tuning.
306    /// Disable if you need fixed chunk sizes or encounter unexpected behavior.
307    ///
308    /// # Example
309    ///
310    /// ```no_run
311    /// // Requires: live Postgres connection via FraiseClient.
312    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
313    /// # use serde::Deserialize;
314    /// # #[derive(Deserialize)] struct Project { id: String }
315    /// let stream = client
316    ///     .query::<Project>("projects")
317    ///     .adaptive_chunking(false)  // Disable adaptive tuning
318    ///     .chunk_size(512)  // Use fixed size
319    ///     .execute()
320    ///     .await?;
321    /// # Ok(())
322    /// # }
323    /// ```
324    pub const fn adaptive_chunking(mut self, enabled: bool) -> Self {
325        self.enable_adaptive_chunking = enabled;
326        self
327    }
328
329    /// Override minimum chunk size for adaptive tuning (default: 16)
330    ///
331    /// Adaptive chunking will never decrease chunk size below this value.
332    /// Useful if you need minimum batching for performance.
333    ///
334    /// Only applies if adaptive chunking is enabled.
335    ///
336    /// # Example
337    ///
338    /// ```no_run
339    /// // Requires: live Postgres connection via FraiseClient.
340    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
341    /// # use serde::Deserialize;
342    /// # #[derive(Deserialize)] struct Project { id: String }
343    /// let stream = client
344    ///     .query::<Project>("projects")
345    ///     .adaptive_chunking(true)
346    ///     .adaptive_min_size(32)  // Don't go below 32 items per batch
347    ///     .execute()
348    ///     .await?;
349    /// # Ok(())
350    /// # }
351    /// ```
352    pub const fn adaptive_min_size(mut self, size: usize) -> Self {
353        self.adaptive_min_chunk_size = Some(size);
354        self
355    }
356
357    /// Override maximum chunk size for adaptive tuning (default: 1024)
358    ///
359    /// Adaptive chunking will never increase chunk size above this value.
360    /// Useful if you need memory bounds or latency guarantees.
361    ///
362    /// Only applies if adaptive chunking is enabled.
363    ///
364    /// # Example
365    ///
366    /// ```no_run
367    /// // Requires: live Postgres connection via FraiseClient.
368    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
369    /// # use serde::Deserialize;
370    /// # #[derive(Deserialize)] struct Project { id: String }
371    /// let stream = client
372    ///     .query::<Project>("projects")
373    ///     .adaptive_chunking(true)
374    ///     .adaptive_max_size(512)  // Cap at 512 items per batch
375    ///     .execute()
376    ///     .await?;
377    /// # Ok(())
378    /// # }
379    /// ```
380    pub const fn adaptive_max_size(mut self, size: usize) -> Self {
381        self.adaptive_max_chunk_size = Some(size);
382        self
383    }
384
385    /// Execute query and return typed stream
386    ///
387    /// Type T ONLY affects consumer-side deserialization at `poll_next()`.
388    /// SQL, filtering, ordering, and wire protocol are identical regardless of T.
389    ///
390    /// The returned stream supports pause/resume/stats for advanced stream control.
391    ///
392    /// # Examples
393    ///
394    /// With type-safe deserialization:
395    /// ```no_run
396    /// // Requires: live Postgres connection via FraiseClient.
397    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
398    /// # use serde::Deserialize;
399    /// # #[derive(Deserialize)] struct Project { id: String }
400    /// # use futures::stream::StreamExt;
401    /// let mut stream = client.query::<Project>("projects").execute().await?;
402    /// while let Some(result) = stream.next().await {
403    ///     let project: Project = result?;
404    /// }
405    /// # Ok(())
406    /// # }
407    /// ```
408    ///
409    /// With raw JSON (escape hatch):
410    /// ```no_run
411    /// // Requires: live Postgres connection via FraiseClient.
412    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
413    /// # use futures::stream::StreamExt;
414    /// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
415    /// while let Some(result) = stream.next().await {
416    ///     let json: serde_json::Value = result?;
417    /// }
418    /// # Ok(())
419    /// # }
420    /// ```
421    ///
422    /// With stream control:
423    /// ```no_run
424    /// // Requires: live Postgres connection via FraiseClient.
425    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
426    /// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
427    /// stream.pause().await?;  // Pause the stream
428    /// let stats = stream.stats();  // Get statistics
429    /// stream.resume().await?;  // Resume the stream
430    /// # Ok(())
431    /// # }
432    /// ```
433    ///
434    /// # Errors
435    ///
436    /// Returns `Error::InvalidSchema` if the SQL query cannot be built from the configured predicates.
437    /// Returns `Error::Io` or `Error::Protocol` if the streaming query fails to start.
438    /// Returns `Error::Sql` if the database rejects the query.
439    pub async fn execute(self) -> Result<QueryStream<T>> {
440        let sql = self.build_sql()?;
441        tracing::debug!("executing query: {}", sql);
442
443        // Record query submission metrics
444        crate::metrics::counters::query_submitted(
445            &self.entity,
446            !self.sql_predicates.is_empty(),
447            self.rust_predicate.is_some(),
448            self.order_by.is_some(),
449        );
450
451        let stream = self
452            .client
453            .execute_query(
454                &sql,
455                self.chunk_size,
456                self.max_memory,
457                self.soft_limit_warn_threshold,
458                self.soft_limit_fail_threshold,
459            )
460            .await?;
461
462        // Create QueryStream with optional Rust predicate
463        Ok(QueryStream::new(stream, self.rust_predicate))
464    }
465
466    /// Build SQL query
467    fn build_sql(&self) -> Result<String> {
468        // Use custom SELECT clause if provided, otherwise default to "SELECT data"
469        let select_clause = if let Some(ref projection) = self.custom_select {
470            format!("SELECT {} as data", projection)
471        } else {
472            "SELECT data".to_string()
473        };
474
475        // SAFETY: self.entity is schema-derived (from CompiledSchema, validated at compile
476        // time), not user input.
477        let mut sql = format!("{} FROM {}", select_clause, self.entity);
478
479        if !self.sql_predicates.is_empty() {
480            sql.push_str(" WHERE ");
481            sql.push_str(&self.sql_predicates.join(" AND "));
482        }
483
484        if let Some(ref order) = self.order_by {
485            sql.push_str(" ORDER BY ");
486            sql.push_str(order);
487        }
488
489        if let Some(limit) = self.limit {
490            sql.push_str(&format!(" LIMIT {}", limit));
491        }
492
493        if let Some(offset) = self.offset {
494            sql.push_str(&format!(" OFFSET {}", offset));
495        }
496
497        Ok(sql)
498    }
499}
500
501#[cfg(test)]
502mod tests {
503
504    fn build_test_sql(entity: &str, predicates: Vec<&str>, order_by: Option<&str>) -> String {
505        let mut sql = format!("SELECT data FROM {}", entity);
506        if !predicates.is_empty() {
507            sql.push_str(" WHERE ");
508            sql.push_str(&predicates.join(" AND "));
509        }
510        if let Some(order) = order_by {
511            sql.push_str(" ORDER BY ");
512            sql.push_str(order);
513        }
514        sql
515    }
516
517    #[test]
518    fn test_build_sql_simple() {
519        let sql = build_test_sql("user", vec![], None);
520        assert_eq!(sql, "SELECT data FROM user");
521    }
522
523    #[test]
524    fn test_build_sql_with_where() {
525        let sql = build_test_sql("user", vec!["data->>'status' = 'active'"], None);
526        assert_eq!(
527            sql,
528            "SELECT data FROM user WHERE data->>'status' = 'active'"
529        );
530    }
531
532    #[test]
533    fn test_build_sql_with_order() {
534        let sql = build_test_sql("user", vec![], Some("data->>'name' ASC"));
535        assert_eq!(sql, "SELECT data FROM user ORDER BY data->>'name' ASC");
536    }
537
538    #[test]
539    fn test_build_sql_with_limit() {
540        let mut sql = "SELECT data FROM user".to_string();
541        sql.push_str(" LIMIT 10");
542        assert_eq!(sql, "SELECT data FROM user LIMIT 10");
543    }
544
545    #[test]
546    fn test_build_sql_with_offset() {
547        let mut sql = "SELECT data FROM user".to_string();
548        sql.push_str(" OFFSET 20");
549        assert_eq!(sql, "SELECT data FROM user OFFSET 20");
550    }
551
552    #[test]
553    fn test_build_sql_with_limit_and_offset() {
554        let mut sql = "SELECT data FROM user".to_string();
555        sql.push_str(" LIMIT 10");
556        sql.push_str(" OFFSET 20");
557        assert_eq!(sql, "SELECT data FROM user LIMIT 10 OFFSET 20");
558    }
559
560    #[test]
561    fn test_build_sql_complete() {
562        let mut sql = "SELECT data FROM user".to_string();
563        sql.push_str(" WHERE data->>'status' = 'active'");
564        sql.push_str(" ORDER BY data->>'name' ASC");
565        sql.push_str(" LIMIT 10");
566        sql.push_str(" OFFSET 20");
567        assert_eq!(
568            sql,
569            "SELECT data FROM user WHERE data->>'status' = 'active' ORDER BY data->>'name' ASC LIMIT 10 OFFSET 20"
570        );
571    }
572
573    // Projection tests
574    #[test]
575    fn test_build_sql_default_select() {
576        let sql = build_test_sql("users", vec![], None);
577        assert!(sql.starts_with("SELECT data FROM"));
578        assert_eq!(sql, "SELECT data FROM users");
579    }
580
581    #[test]
582    fn test_projection_single_field() {
583        let sql = "SELECT jsonb_build_object('id', data->>'id') as data FROM users".to_string();
584        assert!(sql.contains("as data"));
585        assert!(sql.starts_with("SELECT jsonb_build_object("));
586        assert!(sql.contains("FROM users"));
587    }
588
589    #[test]
590    fn test_projection_multiple_fields() {
591        let projection =
592            "jsonb_build_object('id', data->>'id', 'name', data->>'name', 'email', data->>'email')";
593        let sql = format!("SELECT {} as data FROM users", projection);
594        assert!(sql.contains("as data FROM users"));
595        assert!(sql.contains("jsonb_build_object("));
596        assert!(sql.contains("'id'"));
597        assert!(sql.contains("'name'"));
598        assert!(sql.contains("'email'"));
599    }
600
601    #[test]
602    fn test_projection_with_where_clause() {
603        let projection = "jsonb_build_object('id', data->>'id')";
604        let mut sql = format!("SELECT {} as data FROM users", projection);
605        sql.push_str(" WHERE data->>'status' = 'active'");
606        assert!(sql.contains("SELECT jsonb_build_object("));
607        assert!(sql.contains("as data FROM users"));
608        assert!(sql.contains("WHERE data->>'status' = 'active'"));
609    }
610
611    #[test]
612    fn test_projection_with_order_by() {
613        let projection = "jsonb_build_object('id', data->>'id')";
614        let mut sql = format!("SELECT {} as data FROM users", projection);
615        sql.push_str(" ORDER BY data->>'name' ASC");
616        assert!(sql.contains("SELECT jsonb_build_object("));
617        assert!(sql.contains("ORDER BY data->>'name' ASC"));
618    }
619
620    #[test]
621    fn test_projection_with_limit() {
622        let projection = "jsonb_build_object('id', data->>'id')";
623        let mut sql = format!("SELECT {} as data FROM users", projection);
624        sql.push_str(" LIMIT 1000");
625        assert!(sql.contains("as data FROM users"));
626        assert!(sql.contains("LIMIT 1000"));
627    }
628
629    #[test]
630    fn test_projection_with_offset() {
631        let projection = "jsonb_build_object('id', data->>'id')";
632        let mut sql = format!("SELECT {} as data FROM users", projection);
633        sql.push_str(" OFFSET 500");
634        assert!(sql.contains("as data FROM users"));
635        assert!(sql.contains("OFFSET 500"));
636    }
637
638    #[test]
639    fn test_projection_full_pipeline() {
640        let projection =
641            "jsonb_build_object('user_id', data->>'user_id', 'event_type', data->>'event_type')";
642        let mut sql = format!("SELECT {} as data FROM events", projection);
643        sql.push_str(" WHERE event_type IN ('purchase', 'view')");
644        sql.push_str(" ORDER BY timestamp DESC");
645        sql.push_str(" LIMIT 5000");
646        assert!(sql.contains("SELECT jsonb_build_object("));
647        assert!(sql.contains("'user_id'"));
648        assert!(sql.contains("'event_type'"));
649        assert!(sql.contains("as data FROM events"));
650        assert!(sql.contains("WHERE event_type IN ('purchase', 'view')"));
651        assert!(sql.contains("ORDER BY timestamp DESC"));
652        assert!(sql.contains("LIMIT 5000"));
653    }
654
655    // Stream pipeline integration tests
656    #[test]
657    fn test_typed_stream_with_value_type() {
658        // Verify that TypedJsonStream can wrap a raw JSON stream
659        use crate::stream::TypedJsonStream;
660        use futures::stream;
661
662        let values = vec![
663            Ok(serde_json::json!({"id": "1", "name": "Alice"})),
664            Ok(serde_json::json!({"id": "2", "name": "Bob"})),
665        ];
666
667        let json_stream = stream::iter(values);
668        let typed_stream: TypedJsonStream<serde_json::Value> =
669            TypedJsonStream::new(Box::new(json_stream));
670
671        // This verifies the stream compiles and has correct type
672        let _stream: Box<
673            dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
674        > = Box::new(typed_stream);
675    }
676
677    #[test]
678    fn test_filtered_stream_with_typed_output() {
679        // Verify that FilteredStream correctly filters before TypedJsonStream
680        use crate::stream::{FilteredStream, TypedJsonStream};
681        use futures::stream;
682
683        let values = vec![
684            Ok(serde_json::json!({"id": 1, "active": true})),
685            Ok(serde_json::json!({"id": 2, "active": false})),
686            Ok(serde_json::json!({"id": 3, "active": true})),
687        ];
688
689        let json_stream = stream::iter(values);
690        let predicate = Box::new(|v: &serde_json::Value| v["active"].as_bool().unwrap_or(false));
691
692        let filtered = FilteredStream::new(json_stream, predicate);
693        let typed_stream: TypedJsonStream<serde_json::Value> =
694            TypedJsonStream::new(Box::new(filtered));
695
696        // This verifies the full pipeline compiles
697        let _stream: Box<
698            dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
699        > = Box::new(typed_stream);
700    }
701
702    #[test]
703    fn test_stream_pipeline_type_flow() {
704        // Comprehensive test of stream type compatibility:
705        // JsonStream (Result<Value>) → FilteredStream (Result<Value>) → TypedJsonStream<T> (Result<T>)
706        use crate::stream::{FilteredStream, TypedJsonStream};
707        use futures::stream;
708        use serde::Deserialize;
709
710        #[derive(Deserialize, Debug)]
711        #[allow(dead_code)] // Reason: test fixture struct; fields read only by serde deserialization verification
712        struct TestUser {
713            id: String,
714            active: bool,
715        }
716
717        let values = vec![
718            Ok(serde_json::json!({"id": "1", "active": true})),
719            Ok(serde_json::json!({"id": "2", "active": false})),
720        ];
721
722        let json_stream = stream::iter(values);
723
724        // Step 1: FilteredStream filters JSON values
725        let predicate: Box<dyn Fn(&serde_json::Value) -> bool + Send> =
726            Box::new(|v| v["active"].as_bool().unwrap_or(false));
727        let filtered: Box<
728            dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Send + Unpin,
729        > = Box::new(FilteredStream::new(json_stream, predicate));
730
731        // Step 2: TypedJsonStream deserializes to TestUser
732        let typed: TypedJsonStream<TestUser> = TypedJsonStream::new(filtered);
733
734        // This verifies type system is compatible:
735        // - FilteredStream outputs Result<Value>
736        // - TypedJsonStream<T> takes Box<dyn Stream<Item = Result<Value>>>
737        // - TypedJsonStream<T> outputs Result<T>
738        let _final_stream: Box<
739            dyn futures::stream::Stream<Item = crate::Result<TestUser>> + Unpin,
740        > = Box::new(typed);
741    }
742}