Skip to main content

fraiseql_wire/client/
query_builder.rs

1//! Query builder API
2//!
3//! Generic query builder that supports automatic JSON deserialization to target types.
4//!
5//! **IMPORTANT**: Type T is **consumer-side only**.
6//!
7//! Type T does NOT affect:
8//! - SQL generation (always `SELECT data FROM {entity}`)
9//! - Filtering (`where_sql`, `where_rust`, `order_by`)
10//! - Wire protocol (identical for all T)
11//!
12//! Type T ONLY affects:
13//! - Consumer-side deserialization at `poll_next()`
14//! - Error messages (type name included)
15
16use crate::client::FraiseClient;
17#[allow(unused_imports)] // Reason: used only in doc links for `# Errors` sections
18use crate::error::WireError;
19use crate::stream::QueryStream;
20use crate::Result;
21use serde::de::DeserializeOwned;
22use serde_json::Value;
23use std::marker::PhantomData;
24
25/// Type alias for a Rust-side predicate function
26type RustPredicate = Box<dyn Fn(&Value) -> bool + Send>;
27
28/// Generic query builder
29///
30/// The type parameter T controls consumer-side deserialization only.
31/// Default type T = `serde_json::Value` for backward compatibility.
32///
33/// # Examples
34///
35/// Type-safe query (recommended):
36/// ```no_run
37/// // Requires: live Postgres connection via FraiseClient.
38/// use serde::Deserialize;
39///
40/// #[derive(Deserialize)]
41/// struct Project {
42///     id: String,
43///     name: String,
44/// }
45/// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
46/// let stream = client.query::<Project>("projects")
47///     .where_sql("status='active'")
48///     .execute()
49///     .await?;
50/// # Ok(())
51/// # }
52/// ```
53///
54/// Raw JSON query (debugging, forward compatibility):
55/// ```no_run
56/// // Requires: live Postgres connection via FraiseClient.
57/// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
58/// let stream = client.query::<serde_json::Value>("projects")
59///     .execute()
60///     .await?;
61/// # Ok(())
62/// # }
63/// ```
64#[must_use = "call .execute() to run the query"]
65pub struct QueryBuilder<T: DeserializeOwned + Unpin + 'static = serde_json::Value> {
66    client: FraiseClient,
67    entity: String,
68    sql_predicates: Vec<String>,
69    rust_predicate: Option<RustPredicate>,
70    order_by: Option<String>,
71    limit: Option<usize>,
72    offset: Option<usize>,
73    chunk_size: usize,
74    max_memory: Option<usize>,
75    soft_limit_warn_threshold: Option<f32>, // Percentage (0.0-1.0) at which to warn
76    soft_limit_fail_threshold: Option<f32>, // Percentage (0.0-1.0) at which to error
77    enable_adaptive_chunking: bool,
78    adaptive_min_chunk_size: Option<usize>,
79    adaptive_max_chunk_size: Option<usize>,
80    custom_select: Option<String>, // Optional custom SELECT clause for SQL projection
81    _phantom: PhantomData<T>,
82}
83
84impl<T: DeserializeOwned + Unpin + 'static> QueryBuilder<T> {
85    /// Create new query builder
86    pub(crate) fn new(client: FraiseClient, entity: impl Into<String>) -> Self {
87        Self {
88            client,
89            entity: entity.into(),
90            sql_predicates: Vec::new(),
91            rust_predicate: None,
92            order_by: None,
93            limit: None,
94            offset: None,
95            chunk_size: 256,
96            max_memory: None,
97            soft_limit_warn_threshold: None,
98            soft_limit_fail_threshold: None,
99            enable_adaptive_chunking: true, // Enabled by default
100            adaptive_min_chunk_size: None,
101            adaptive_max_chunk_size: None,
102            custom_select: None,
103            _phantom: PhantomData,
104        }
105    }
106
107    /// Add SQL WHERE clause predicate
108    ///
109    /// Type T does NOT affect SQL generation.
110    /// Multiple predicates are AND'ed together.
111    pub fn where_sql(mut self, predicate: impl Into<String>) -> Self {
112        self.sql_predicates.push(predicate.into());
113        self
114    }
115
116    /// Add Rust-side predicate
117    ///
118    /// Type T does NOT affect filtering.
119    /// Applied after SQL filtering, runs on streamed JSON values.
120    /// Predicates receive &`serde_json::Value` regardless of T.
121    pub fn where_rust<F>(mut self, predicate: F) -> Self
122    where
123        F: Fn(&Value) -> bool + Send + 'static,
124    {
125        self.rust_predicate = Some(Box::new(predicate));
126        self
127    }
128
129    /// Set ORDER BY clause
130    ///
131    /// Type T does NOT affect ordering.
132    pub fn order_by(mut self, order: impl Into<String>) -> Self {
133        self.order_by = Some(order.into());
134        self
135    }
136
137    /// Set a custom SELECT clause for SQL projection optimization
138    ///
139    /// When provided, this replaces the default `SELECT data` with a projection SQL
140    /// that filters fields at the database level, reducing network payload.
141    ///
142    /// The projection SQL will be wrapped as `SELECT {projection_sql} as data` to maintain
143    /// the hard invariant of a single `data` column.
144    ///
145    /// This feature enables architectural consistency with PostgreSQL optimization
146    /// and prepares for future performance improvements.
147    ///
148    /// # Arguments
149    ///
150    /// * `projection_sql` - PostgreSQL expression, typically from `jsonb_build_object()`
151    ///
152    /// # Example
153    ///
154    /// ```no_run
155    /// // Requires: live Postgres connection via FraiseClient.
156    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
157    /// # use serde::Deserialize;
158    /// # #[derive(Deserialize)] struct Project { id: String, name: String }
159    /// let stream = client
160    ///     .query::<Project>("projects")
161    ///     .select_projection("jsonb_build_object('id', data->>'id', 'name', data->>'name')")
162    ///     .execute()
163    ///     .await?;
164    /// # Ok(())
165    /// # }
166    /// ```
167    ///
168    /// # Backward Compatibility
169    ///
170    /// If not specified, defaults to `SELECT data` (original behavior).
171    pub fn select_projection(mut self, projection_sql: impl Into<String>) -> Self {
172        self.custom_select = Some(projection_sql.into());
173        self
174    }
175
176    /// Set LIMIT clause to restrict result set size
177    ///
178    /// # Example
179    ///
180    /// ```no_run
181    /// // Requires: live Postgres connection via FraiseClient.
182    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
183    /// # use serde::Deserialize;
184    /// # #[derive(Deserialize)] struct Project { id: String }
185    /// let stream = client.query::<Project>("projects")
186    ///     .limit(10)
187    ///     .execute()
188    ///     .await?;
189    /// # Ok(())
190    /// # }
191    /// ```
192    pub const fn limit(mut self, count: usize) -> Self {
193        self.limit = Some(count);
194        self
195    }
196
197    /// Set OFFSET clause to skip first N rows
198    ///
199    /// # Example
200    ///
201    /// ```no_run
202    /// // Requires: live Postgres connection via FraiseClient.
203    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
204    /// # use serde::Deserialize;
205    /// # #[derive(Deserialize)] struct Project { id: String }
206    /// let stream = client.query::<Project>("projects")
207    ///     .limit(10)
208    ///     .offset(20)  // Skip first 20, return next 10
209    ///     .execute()
210    ///     .await?;
211    /// # Ok(())
212    /// # }
213    /// ```
214    pub const fn offset(mut self, count: usize) -> Self {
215        self.offset = Some(count);
216        self
217    }
218
219    /// Set chunk size (default: 256)
220    pub const fn chunk_size(mut self, size: usize) -> Self {
221        self.chunk_size = size;
222        self
223    }
224
225    /// Set maximum memory limit for buffered items (default: unbounded)
226    ///
227    /// When the estimated memory usage of buffered items exceeds this limit,
228    /// the stream will return `WireError::MemoryLimitExceeded` instead of additional items.
229    ///
230    /// Memory is estimated as: `items_buffered * 2048 bytes` (conservative for typical JSON).
231    ///
232    /// By default, `max_memory()` is None (unbounded), maintaining backward compatibility.
233    /// Only set if you need hard memory bounds.
234    ///
235    /// # Example
236    ///
237    /// ```no_run
238    /// // Requires: live Postgres connection via FraiseClient.
239    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
240    /// # use serde::Deserialize;
241    /// # #[derive(Deserialize)] struct Project { id: String }
242    /// let stream = client
243    ///     .query::<Project>("projects")
244    ///     .max_memory(500_000_000)  // 500 MB limit
245    ///     .execute()
246    ///     .await?;
247    /// # Ok(())
248    /// # }
249    /// ```
250    ///
251    /// # Interpretation
252    ///
253    /// If memory limit is exceeded:
254    /// - It indicates the consumer is too slow relative to data arrival
255    /// - The error is terminal (non-retriable) — retrying won't help
256    /// - Consider: increasing consumer throughput, reducing `chunk_size`, or removing limit
257    pub const fn max_memory(mut self, bytes: usize) -> Self {
258        self.max_memory = Some(bytes);
259        self
260    }
261
262    /// Set soft memory limit thresholds for progressive degradation
263    ///
264    /// Allows warning at a threshold before hitting hard limit.
265    /// Only applies if `max_memory()` is also set.
266    ///
267    /// # Parameters
268    ///
269    /// - `warn_threshold`: Percentage (0.0-1.0) at which to emit a warning
270    /// - `fail_threshold`: Percentage (0.0-1.0) at which to return error (must be > `warn_threshold`)
271    ///
272    /// # Example
273    ///
274    /// ```no_run
275    /// // Requires: live Postgres connection via FraiseClient.
276    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
277    /// # use serde::Deserialize;
278    /// # #[derive(Deserialize)] struct Project { id: String }
279    /// let stream = client
280    ///     .query::<Project>("projects")
281    ///     .max_memory(500_000_000)  // 500 MB hard limit
282    ///     .memory_soft_limits(0.80, 1.0)  // Warn at 80%, error at 100%
283    ///     .execute()
284    ///     .await?;
285    /// # Ok(())
286    /// # }
287    /// ```
288    ///
289    /// If only hard limit needed, skip this and just use `max_memory()`.
290    pub fn memory_soft_limits(mut self, warn_threshold: f32, fail_threshold: f32) -> Self {
291        // Validate thresholds
292        let warn = warn_threshold.clamp(0.0, 1.0);
293        let fail = fail_threshold.clamp(0.0, 1.0);
294
295        if warn < fail {
296            self.soft_limit_warn_threshold = Some(warn);
297            self.soft_limit_fail_threshold = Some(fail);
298        }
299        self
300    }
301
302    /// Enable or disable adaptive chunk sizing (default: enabled)
303    ///
304    /// Adaptive chunking automatically adjusts `chunk_size` based on channel occupancy:
305    /// - High occupancy (>80%): Decreases chunk size to reduce producer pressure
306    /// - Low occupancy (<20%): Increases chunk size to optimize batching efficiency
307    ///
308    /// Enabled by default for zero-configuration self-tuning.
309    /// Disable if you need fixed chunk sizes or encounter unexpected behavior.
310    ///
311    /// # Example
312    ///
313    /// ```no_run
314    /// // Requires: live Postgres connection via FraiseClient.
315    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
316    /// # use serde::Deserialize;
317    /// # #[derive(Deserialize)] struct Project { id: String }
318    /// let stream = client
319    ///     .query::<Project>("projects")
320    ///     .adaptive_chunking(false)  // Disable adaptive tuning
321    ///     .chunk_size(512)  // Use fixed size
322    ///     .execute()
323    ///     .await?;
324    /// # Ok(())
325    /// # }
326    /// ```
327    pub const fn adaptive_chunking(mut self, enabled: bool) -> Self {
328        self.enable_adaptive_chunking = enabled;
329        self
330    }
331
332    /// Override minimum chunk size for adaptive tuning (default: 16)
333    ///
334    /// Adaptive chunking will never decrease chunk size below this value.
335    /// Useful if you need minimum batching for performance.
336    ///
337    /// Only applies if adaptive chunking is enabled.
338    ///
339    /// # Example
340    ///
341    /// ```no_run
342    /// // Requires: live Postgres connection via FraiseClient.
343    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
344    /// # use serde::Deserialize;
345    /// # #[derive(Deserialize)] struct Project { id: String }
346    /// let stream = client
347    ///     .query::<Project>("projects")
348    ///     .adaptive_chunking(true)
349    ///     .adaptive_min_size(32)  // Don't go below 32 items per batch
350    ///     .execute()
351    ///     .await?;
352    /// # Ok(())
353    /// # }
354    /// ```
355    pub const fn adaptive_min_size(mut self, size: usize) -> Self {
356        self.adaptive_min_chunk_size = Some(size);
357        self
358    }
359
360    /// Override maximum chunk size for adaptive tuning (default: 1024)
361    ///
362    /// Adaptive chunking will never increase chunk size above this value.
363    /// Useful if you need memory bounds or latency guarantees.
364    ///
365    /// Only applies if adaptive chunking is enabled.
366    ///
367    /// # Example
368    ///
369    /// ```no_run
370    /// // Requires: live Postgres connection via FraiseClient.
371    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
372    /// # use serde::Deserialize;
373    /// # #[derive(Deserialize)] struct Project { id: String }
374    /// let stream = client
375    ///     .query::<Project>("projects")
376    ///     .adaptive_chunking(true)
377    ///     .adaptive_max_size(512)  // Cap at 512 items per batch
378    ///     .execute()
379    ///     .await?;
380    /// # Ok(())
381    /// # }
382    /// ```
383    pub const fn adaptive_max_size(mut self, size: usize) -> Self {
384        self.adaptive_max_chunk_size = Some(size);
385        self
386    }
387
388    /// Execute query and return typed stream
389    ///
390    /// Type T ONLY affects consumer-side deserialization at `poll_next()`.
391    /// SQL, filtering, ordering, and wire protocol are identical regardless of T.
392    ///
393    /// The returned stream supports pause/resume/stats for advanced stream control.
394    ///
395    /// # Examples
396    ///
397    /// With type-safe deserialization:
398    /// ```no_run
399    /// // Requires: live Postgres connection via FraiseClient.
400    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
401    /// # use serde::Deserialize;
402    /// # #[derive(Deserialize)] struct Project { id: String }
403    /// # use futures::stream::StreamExt;
404    /// let mut stream = client.query::<Project>("projects").execute().await?;
405    /// while let Some(result) = stream.next().await {
406    ///     let project: Project = result?;
407    /// }
408    /// # Ok(())
409    /// # }
410    /// ```
411    ///
412    /// With raw JSON (escape hatch):
413    /// ```no_run
414    /// // Requires: live Postgres connection via FraiseClient.
415    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
416    /// # use futures::stream::StreamExt;
417    /// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
418    /// while let Some(result) = stream.next().await {
419    ///     let json: serde_json::Value = result?;
420    /// }
421    /// # Ok(())
422    /// # }
423    /// ```
424    ///
425    /// With stream control:
426    /// ```no_run
427    /// // Requires: live Postgres connection via FraiseClient.
428    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
429    /// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
430    /// stream.pause().await?;  // Pause the stream
431    /// let stats = stream.stats();  // Get statistics
432    /// stream.resume().await?;  // Resume the stream
433    /// # Ok(())
434    /// # }
435    /// ```
436    ///
437    /// # Errors
438    ///
439    /// Returns [`WireError`] if SQL generation fails or the underlying streaming query
440    /// cannot be started on the connection.
441    pub async fn execute(self) -> Result<QueryStream<T>> {
442        let sql = self.build_sql()?;
443        tracing::debug!("executing query: {}", sql);
444
445        // Record query submission metrics
446        crate::metrics::counters::query_submitted(
447            &self.entity,
448            !self.sql_predicates.is_empty(),
449            self.rust_predicate.is_some(),
450            self.order_by.is_some(),
451        );
452
453        let stream = self
454            .client
455            .execute_query(
456                &sql,
457                self.chunk_size,
458                self.max_memory,
459                self.soft_limit_warn_threshold,
460                self.soft_limit_fail_threshold,
461            )
462            .await?;
463
464        // Create QueryStream with optional Rust predicate
465        Ok(QueryStream::new(stream, self.rust_predicate))
466    }
467
468    /// Build SQL query
469    fn build_sql(&self) -> Result<String> {
470        // Use custom SELECT clause if provided, otherwise default to "SELECT data"
471        let select_clause = if let Some(ref projection) = self.custom_select {
472            format!("SELECT {} as data", projection)
473        } else {
474            "SELECT data".to_string()
475        };
476
477        let mut sql = format!("{} FROM {}", select_clause, self.entity);
478
479        if !self.sql_predicates.is_empty() {
480            sql.push_str(" WHERE ");
481            sql.push_str(&self.sql_predicates.join(" AND "));
482        }
483
484        if let Some(ref order) = self.order_by {
485            sql.push_str(" ORDER BY ");
486            sql.push_str(order);
487        }
488
489        if let Some(limit) = self.limit {
490            sql.push_str(&format!(" LIMIT {}", limit));
491        }
492
493        if let Some(offset) = self.offset {
494            sql.push_str(&format!(" OFFSET {}", offset));
495        }
496
497        Ok(sql)
498    }
499}
500
501#[cfg(test)]
502mod tests {
503
504    fn build_test_sql(entity: &str, predicates: Vec<&str>, order_by: Option<&str>) -> String {
505        let mut sql = format!("SELECT data FROM {}", entity);
506        if !predicates.is_empty() {
507            sql.push_str(" WHERE ");
508            sql.push_str(&predicates.join(" AND "));
509        }
510        if let Some(order) = order_by {
511            sql.push_str(" ORDER BY ");
512            sql.push_str(order);
513        }
514        sql
515    }
516
517    #[test]
518    fn test_build_sql_simple() {
519        let sql = build_test_sql("user", vec![], None);
520        assert_eq!(sql, "SELECT data FROM user");
521    }
522
523    #[test]
524    fn test_build_sql_with_where() {
525        let sql = build_test_sql("user", vec!["data->>'status' = 'active'"], None);
526        assert_eq!(
527            sql,
528            "SELECT data FROM user WHERE data->>'status' = 'active'"
529        );
530    }
531
532    #[test]
533    fn test_build_sql_with_order() {
534        let sql = build_test_sql("user", vec![], Some("data->>'name' ASC"));
535        assert_eq!(sql, "SELECT data FROM user ORDER BY data->>'name' ASC");
536    }
537
538    #[test]
539    fn test_build_sql_with_limit() {
540        let mut sql = "SELECT data FROM user".to_string();
541        sql.push_str(" LIMIT 10");
542        assert_eq!(sql, "SELECT data FROM user LIMIT 10");
543    }
544
545    #[test]
546    fn test_build_sql_with_offset() {
547        let mut sql = "SELECT data FROM user".to_string();
548        sql.push_str(" OFFSET 20");
549        assert_eq!(sql, "SELECT data FROM user OFFSET 20");
550    }
551
552    #[test]
553    fn test_build_sql_with_limit_and_offset() {
554        let mut sql = "SELECT data FROM user".to_string();
555        sql.push_str(" LIMIT 10");
556        sql.push_str(" OFFSET 20");
557        assert_eq!(sql, "SELECT data FROM user LIMIT 10 OFFSET 20");
558    }
559
560    #[test]
561    fn test_build_sql_complete() {
562        let mut sql = "SELECT data FROM user".to_string();
563        sql.push_str(" WHERE data->>'status' = 'active'");
564        sql.push_str(" ORDER BY data->>'name' ASC");
565        sql.push_str(" LIMIT 10");
566        sql.push_str(" OFFSET 20");
567        assert_eq!(
568            sql,
569            "SELECT data FROM user WHERE data->>'status' = 'active' ORDER BY data->>'name' ASC LIMIT 10 OFFSET 20"
570        );
571    }
572
573    // Projection tests
574    #[test]
575    fn test_build_sql_default_select() {
576        let sql = build_test_sql("users", vec![], None);
577        assert!(sql.starts_with("SELECT data FROM"));
578        assert_eq!(sql, "SELECT data FROM users");
579    }
580
581    #[test]
582    fn test_projection_single_field() {
583        let sql = "SELECT jsonb_build_object('id', data->>'id') as data FROM users".to_string();
584        assert!(sql.contains("as data"));
585        assert!(sql.starts_with("SELECT jsonb_build_object("));
586        assert!(sql.contains("FROM users"));
587    }
588
589    #[test]
590    fn test_projection_multiple_fields() {
591        let projection =
592            "jsonb_build_object('id', data->>'id', 'name', data->>'name', 'email', data->>'email')";
593        let sql = format!("SELECT {} as data FROM users", projection);
594        assert!(sql.contains("as data FROM users"));
595        assert!(sql.contains("jsonb_build_object("));
596        assert!(sql.contains("'id'"));
597        assert!(sql.contains("'name'"));
598        assert!(sql.contains("'email'"));
599    }
600
601    #[test]
602    fn test_projection_with_where_clause() {
603        let projection = "jsonb_build_object('id', data->>'id')";
604        let mut sql = format!("SELECT {} as data FROM users", projection);
605        sql.push_str(" WHERE data->>'status' = 'active'");
606        assert!(sql.contains("SELECT jsonb_build_object("));
607        assert!(sql.contains("as data FROM users"));
608        assert!(sql.contains("WHERE data->>'status' = 'active'"));
609    }
610
611    #[test]
612    fn test_projection_with_order_by() {
613        let projection = "jsonb_build_object('id', data->>'id')";
614        let mut sql = format!("SELECT {} as data FROM users", projection);
615        sql.push_str(" ORDER BY data->>'name' ASC");
616        assert!(sql.contains("SELECT jsonb_build_object("));
617        assert!(sql.contains("ORDER BY data->>'name' ASC"));
618    }
619
620    #[test]
621    fn test_projection_with_limit() {
622        let projection = "jsonb_build_object('id', data->>'id')";
623        let mut sql = format!("SELECT {} as data FROM users", projection);
624        sql.push_str(" LIMIT 1000");
625        assert!(sql.contains("as data FROM users"));
626        assert!(sql.contains("LIMIT 1000"));
627    }
628
629    #[test]
630    fn test_projection_with_offset() {
631        let projection = "jsonb_build_object('id', data->>'id')";
632        let mut sql = format!("SELECT {} as data FROM users", projection);
633        sql.push_str(" OFFSET 500");
634        assert!(sql.contains("as data FROM users"));
635        assert!(sql.contains("OFFSET 500"));
636    }
637
638    #[test]
639    fn test_projection_full_pipeline() {
640        let projection =
641            "jsonb_build_object('user_id', data->>'user_id', 'event_type', data->>'event_type')";
642        let mut sql = format!("SELECT {} as data FROM events", projection);
643        sql.push_str(" WHERE event_type IN ('purchase', 'view')");
644        sql.push_str(" ORDER BY timestamp DESC");
645        sql.push_str(" LIMIT 5000");
646        assert!(sql.contains("SELECT jsonb_build_object("));
647        assert!(sql.contains("'user_id'"));
648        assert!(sql.contains("'event_type'"));
649        assert!(sql.contains("as data FROM events"));
650        assert!(sql.contains("WHERE event_type IN ('purchase', 'view')"));
651        assert!(sql.contains("ORDER BY timestamp DESC"));
652        assert!(sql.contains("LIMIT 5000"));
653    }
654
655    // Stream pipeline integration tests
656    #[test]
657    fn test_typed_stream_with_value_type() {
658        // Verify that TypedJsonStream can wrap a raw JSON stream
659        use crate::stream::TypedJsonStream;
660        use futures::stream;
661
662        let values = vec![
663            Ok(serde_json::json!({"id": "1", "name": "Alice"})),
664            Ok(serde_json::json!({"id": "2", "name": "Bob"})),
665        ];
666
667        let json_stream = stream::iter(values);
668        let typed_stream: TypedJsonStream<serde_json::Value> =
669            TypedJsonStream::new(Box::new(json_stream));
670
671        // This verifies the stream compiles and has correct type
672        let _stream: Box<
673            dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
674        > = Box::new(typed_stream);
675    }
676
677    #[test]
678    fn test_filtered_stream_with_typed_output() {
679        // Verify that FilteredStream correctly filters before TypedJsonStream
680        use crate::stream::{FilteredStream, TypedJsonStream};
681        use futures::stream;
682
683        let values = vec![
684            Ok(serde_json::json!({"id": 1, "active": true})),
685            Ok(serde_json::json!({"id": 2, "active": false})),
686            Ok(serde_json::json!({"id": 3, "active": true})),
687        ];
688
689        let json_stream = stream::iter(values);
690        let predicate = Box::new(|v: &serde_json::Value| v["active"].as_bool().unwrap_or(false));
691
692        let filtered = FilteredStream::new(json_stream, predicate);
693        let typed_stream: TypedJsonStream<serde_json::Value> =
694            TypedJsonStream::new(Box::new(filtered));
695
696        // This verifies the full pipeline compiles
697        let _stream: Box<
698            dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
699        > = Box::new(typed_stream);
700    }
701
702    #[test]
703    fn test_stream_pipeline_type_flow() {
704        // Comprehensive test of stream type compatibility:
705        // JsonStream (Result<Value>) → FilteredStream (Result<Value>) → TypedJsonStream<T> (Result<T>)
706        use crate::stream::{FilteredStream, TypedJsonStream};
707        use futures::stream;
708        use serde::Deserialize;
709
710        #[derive(Deserialize, Debug)]
711        // Reason: test fixture struct used only for deserialization verification
712        #[allow(dead_code)] // Reason: field kept for API completeness; may be used in future features
713        struct TestUser {
714            id: String,
715            active: bool,
716        }
717
718        let values = vec![
719            Ok(serde_json::json!({"id": "1", "active": true})),
720            Ok(serde_json::json!({"id": "2", "active": false})),
721        ];
722
723        let json_stream = stream::iter(values);
724
725        // Step 1: FilteredStream filters JSON values
726        let predicate: Box<dyn Fn(&serde_json::Value) -> bool + Send> =
727            Box::new(|v| v["active"].as_bool().unwrap_or(false));
728        let filtered: Box<
729            dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Send + Unpin,
730        > = Box::new(FilteredStream::new(json_stream, predicate));
731
732        // Step 2: TypedJsonStream deserializes to TestUser
733        let typed: TypedJsonStream<TestUser> = TypedJsonStream::new(filtered);
734
735        // This verifies type system is compatible:
736        // - FilteredStream outputs Result<Value>
737        // - TypedJsonStream<T> takes Box<dyn Stream<Item = Result<Value>>>
738        // - TypedJsonStream<T> outputs Result<T>
739        let _final_stream: Box<
740            dyn futures::stream::Stream<Item = crate::Result<TestUser>> + Unpin,
741        > = Box::new(typed);
742    }
743}