Skip to main content

fraiseql_wire/client/
query_builder.rs

1//! Query builder API
2//!
3//! Generic query builder that supports automatic JSON deserialization to target types.
4//!
5//! **IMPORTANT**: Type T is **consumer-side only**.
6//!
7//! Type T does NOT affect:
8//! - SQL generation (always `SELECT data FROM {entity}`)
9//! - Filtering (where_sql, where_rust, order_by)
10//! - Wire protocol (identical for all T)
11//!
12//! Type T ONLY affects:
13//! - Consumer-side deserialization at poll_next()
14//! - Error messages (type name included)
15
16use crate::client::FraiseClient;
17use crate::stream::QueryStream;
18use crate::Result;
19use serde::de::DeserializeOwned;
20use serde_json::Value;
21use std::marker::PhantomData;
22
23/// Type alias for a Rust-side predicate function
24type RustPredicate = Box<dyn Fn(&Value) -> bool + Send>;
25
26/// Generic query builder
27///
28/// The type parameter T controls consumer-side deserialization only.
29/// Default type T = serde_json::Value for backward compatibility.
30///
31/// # Examples
32///
33/// Type-safe query (recommended):
34/// ```ignore
35/// use serde::Deserialize;
36///
37/// #[derive(Deserialize)]
38/// struct Project {
39///     id: String,
40///     name: String,
41/// }
42///
43/// let stream = client.query::<Project>("projects")
44///     .where_sql("status='active'")
45///     .execute()
46///     .await?;
47/// ```
48///
49/// Raw JSON query (debugging, forward compatibility):
50/// ```ignore
51/// let stream = client.query::<serde_json::Value>("projects")
52///     .execute()
53///     .await?;
54/// ```
55pub struct QueryBuilder<T: DeserializeOwned + Unpin + 'static = serde_json::Value> {
56    client: FraiseClient,
57    entity: String,
58    sql_predicates: Vec<String>,
59    rust_predicate: Option<RustPredicate>,
60    order_by: Option<String>,
61    limit: Option<usize>,
62    offset: Option<usize>,
63    chunk_size: usize,
64    max_memory: Option<usize>,
65    soft_limit_warn_threshold: Option<f32>, // Percentage (0.0-1.0) at which to warn
66    soft_limit_fail_threshold: Option<f32>, // Percentage (0.0-1.0) at which to error
67    enable_adaptive_chunking: bool,
68    adaptive_min_chunk_size: Option<usize>,
69    adaptive_max_chunk_size: Option<usize>,
70    custom_select: Option<String>, // Optional custom SELECT clause for SQL projection
71    _phantom: PhantomData<T>,
72}
73
74impl<T: DeserializeOwned + Unpin + 'static> QueryBuilder<T> {
75    /// Create new query builder
76    pub(crate) fn new(client: FraiseClient, entity: impl Into<String>) -> Self {
77        Self {
78            client,
79            entity: entity.into(),
80            sql_predicates: Vec::new(),
81            rust_predicate: None,
82            order_by: None,
83            limit: None,
84            offset: None,
85            chunk_size: 256,
86            max_memory: None,
87            soft_limit_warn_threshold: None,
88            soft_limit_fail_threshold: None,
89            enable_adaptive_chunking: true, // Enabled by default
90            adaptive_min_chunk_size: None,
91            adaptive_max_chunk_size: None,
92            custom_select: None,
93            _phantom: PhantomData,
94        }
95    }
96
97    /// Add SQL WHERE clause predicate
98    ///
99    /// Type T does NOT affect SQL generation.
100    /// Multiple predicates are AND'ed together.
101    pub fn where_sql(mut self, predicate: impl Into<String>) -> Self {
102        self.sql_predicates.push(predicate.into());
103        self
104    }
105
106    /// Add Rust-side predicate
107    ///
108    /// Type T does NOT affect filtering.
109    /// Applied after SQL filtering, runs on streamed JSON values.
110    /// Predicates receive &serde_json::Value regardless of T.
111    pub fn where_rust<F>(mut self, predicate: F) -> Self
112    where
113        F: Fn(&Value) -> bool + Send + 'static,
114    {
115        self.rust_predicate = Some(Box::new(predicate));
116        self
117    }
118
119    /// Set ORDER BY clause
120    ///
121    /// Type T does NOT affect ordering.
122    pub fn order_by(mut self, order: impl Into<String>) -> Self {
123        self.order_by = Some(order.into());
124        self
125    }
126
127    /// Set a custom SELECT clause for SQL projection optimization
128    ///
129    /// When provided, this replaces the default `SELECT data` with a projection SQL
130    /// that filters fields at the database level, reducing network payload.
131    ///
132    /// The projection SQL will be wrapped as `SELECT {projection_sql} as data` to maintain
133    /// the hard invariant of a single `data` column.
134    ///
135    /// This feature enables architectural consistency with PostgreSQL optimization
136    /// and prepares for future performance improvements.
137    ///
138    /// # Arguments
139    ///
140    /// * `projection_sql` - PostgreSQL expression, typically from `jsonb_build_object()`
141    ///
142    /// # Example
143    ///
144    /// ```ignore
145    /// let stream = client
146    ///     .query::<Project>("projects")
147    ///     .select_projection("jsonb_build_object('id', data->>'id', 'name', data->>'name')")
148    ///     .execute()
149    ///     .await?;
150    /// ```
151    ///
152    /// # Backward Compatibility
153    ///
154    /// If not specified, defaults to `SELECT data` (original behavior).
155    pub fn select_projection(mut self, projection_sql: impl Into<String>) -> Self {
156        self.custom_select = Some(projection_sql.into());
157        self
158    }
159
160    /// Set LIMIT clause to restrict result set size
161    ///
162    /// # Example
163    ///
164    /// ```ignore
165    /// let stream = client.query::<Project>("projects")
166    ///     .limit(10)
167    ///     .execute()
168    ///     .await?;
169    /// ```
170    pub fn limit(mut self, count: usize) -> Self {
171        self.limit = Some(count);
172        self
173    }
174
175    /// Set OFFSET clause to skip first N rows
176    ///
177    /// # Example
178    ///
179    /// ```ignore
180    /// let stream = client.query::<Project>("projects")
181    ///     .limit(10)
182    ///     .offset(20)  // Skip first 20, return next 10
183    ///     .execute()
184    ///     .await?;
185    /// ```
186    pub fn offset(mut self, count: usize) -> Self {
187        self.offset = Some(count);
188        self
189    }
190
191    /// Set chunk size (default: 256)
192    pub fn chunk_size(mut self, size: usize) -> Self {
193        self.chunk_size = size;
194        self
195    }
196
197    /// Set maximum memory limit for buffered items (default: unbounded)
198    ///
199    /// When the estimated memory usage of buffered items exceeds this limit,
200    /// the stream will return `Error::MemoryLimitExceeded` instead of additional items.
201    ///
202    /// Memory is estimated as: `items_buffered * 2048 bytes` (conservative for typical JSON).
203    ///
204    /// By default, `max_memory()` is None (unbounded), maintaining backward compatibility.
205    /// Only set if you need hard memory bounds.
206    ///
207    /// # Example
208    ///
209    /// ```ignore
210    /// let stream = client
211    ///     .query::<Project>("projects")
212    ///     .max_memory(500_000_000)  // 500 MB limit
213    ///     .execute()
214    ///     .await?;
215    /// ```
216    ///
217    /// # Interpretation
218    ///
219    /// If memory limit is exceeded:
220    /// - It indicates the consumer is too slow relative to data arrival
221    /// - The error is terminal (non-retriable) — retrying won't help
222    /// - Consider: increasing consumer throughput, reducing chunk_size, or removing limit
223    pub fn max_memory(mut self, bytes: usize) -> Self {
224        self.max_memory = Some(bytes);
225        self
226    }
227
228    /// Set soft memory limit thresholds for progressive degradation
229    ///
230    /// Allows warning at a threshold before hitting hard limit.
231    /// Only applies if `max_memory()` is also set.
232    ///
233    /// # Parameters
234    ///
235    /// - `warn_threshold`: Percentage (0.0-1.0) at which to emit a warning
236    /// - `fail_threshold`: Percentage (0.0-1.0) at which to return error (must be > warn_threshold)
237    ///
238    /// # Example
239    ///
240    /// ```ignore
241    /// let stream = client
242    ///     .query::<Project>("projects")
243    ///     .max_memory(500_000_000)  // 500 MB hard limit
244    ///     .memory_soft_limits(0.80, 1.0)  // Warn at 80%, error at 100%
245    ///     .execute()
246    ///     .await?;
247    /// ```
248    ///
249    /// If only hard limit needed, skip this and just use `max_memory()`.
250    pub fn memory_soft_limits(mut self, warn_threshold: f32, fail_threshold: f32) -> Self {
251        // Validate thresholds
252        let warn = warn_threshold.clamp(0.0, 1.0);
253        let fail = fail_threshold.clamp(0.0, 1.0);
254
255        if warn < fail {
256            self.soft_limit_warn_threshold = Some(warn);
257            self.soft_limit_fail_threshold = Some(fail);
258        }
259        self
260    }
261
262    /// Enable or disable adaptive chunk sizing (default: enabled)
263    ///
264    /// Adaptive chunking automatically adjusts `chunk_size` based on channel occupancy:
265    /// - High occupancy (>80%): Decreases chunk size to reduce producer pressure
266    /// - Low occupancy (<20%): Increases chunk size to optimize batching efficiency
267    ///
268    /// Enabled by default for zero-configuration self-tuning.
269    /// Disable if you need fixed chunk sizes or encounter unexpected behavior.
270    ///
271    /// # Example
272    ///
273    /// ```ignore
274    /// let stream = client
275    ///     .query::<Project>("projects")
276    ///     .adaptive_chunking(false)  // Disable adaptive tuning
277    ///     .chunk_size(512)  // Use fixed size
278    ///     .execute()
279    ///     .await?;
280    /// ```
281    pub fn adaptive_chunking(mut self, enabled: bool) -> Self {
282        self.enable_adaptive_chunking = enabled;
283        self
284    }
285
286    /// Override minimum chunk size for adaptive tuning (default: 16)
287    ///
288    /// Adaptive chunking will never decrease chunk size below this value.
289    /// Useful if you need minimum batching for performance.
290    ///
291    /// Only applies if adaptive chunking is enabled.
292    ///
293    /// # Example
294    ///
295    /// ```ignore
296    /// let stream = client
297    ///     .query::<Project>("projects")
298    ///     .adaptive_chunking(true)
299    ///     .adaptive_min_size(32)  // Don't go below 32 items per batch
300    ///     .execute()
301    ///     .await?;
302    /// ```
303    pub fn adaptive_min_size(mut self, size: usize) -> Self {
304        self.adaptive_min_chunk_size = Some(size);
305        self
306    }
307
308    /// Override maximum chunk size for adaptive tuning (default: 1024)
309    ///
310    /// Adaptive chunking will never increase chunk size above this value.
311    /// Useful if you need memory bounds or latency guarantees.
312    ///
313    /// Only applies if adaptive chunking is enabled.
314    ///
315    /// # Example
316    ///
317    /// ```ignore
318    /// let stream = client
319    ///     .query::<Project>("projects")
320    ///     .adaptive_chunking(true)
321    ///     .adaptive_max_size(512)  // Cap at 512 items per batch
322    ///     .execute()
323    ///     .await?;
324    /// ```
325    pub fn adaptive_max_size(mut self, size: usize) -> Self {
326        self.adaptive_max_chunk_size = Some(size);
327        self
328    }
329
330    /// Execute query and return typed stream
331    ///
332    /// Type T ONLY affects consumer-side deserialization at poll_next().
333    /// SQL, filtering, ordering, and wire protocol are identical regardless of T.
334    ///
335    /// The returned stream supports pause/resume/stats for advanced stream control.
336    ///
337    /// # Examples
338    ///
339    /// With type-safe deserialization:
340    /// ```ignore
341    /// let stream = client.query::<Project>("projects").execute().await?;
342    /// while let Some(result) = stream.next().await {
343    ///     let project: Project = result?;
344    /// }
345    /// ```
346    ///
347    /// With raw JSON (escape hatch):
348    /// ```ignore
349    /// let stream = client.query::<serde_json::Value>("projects").execute().await?;
350    /// while let Some(result) = stream.next().await {
351    ///     let json: Value = result?;
352    /// }
353    /// ```
354    ///
355    /// With stream control:
356    /// ```ignore
357    /// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
358    /// stream.pause().await?;  // Pause the stream
359    /// let stats = stream.stats();  // Get statistics
360    /// stream.resume().await?;  // Resume the stream
361    /// ```
362    pub async fn execute(self) -> Result<QueryStream<T>> {
363        let sql = self.build_sql()?;
364        tracing::debug!("executing query: {}", sql);
365
366        // Record query submission metrics
367        crate::metrics::counters::query_submitted(
368            &self.entity,
369            !self.sql_predicates.is_empty(),
370            self.rust_predicate.is_some(),
371            self.order_by.is_some(),
372        );
373
374        let stream = self
375            .client
376            .execute_query(
377                &sql,
378                self.chunk_size,
379                self.max_memory,
380                self.soft_limit_warn_threshold,
381                self.soft_limit_fail_threshold,
382            )
383            .await?;
384
385        // Create QueryStream with optional Rust predicate
386        Ok(QueryStream::new(stream, self.rust_predicate))
387    }
388
389    /// Build SQL query
390    fn build_sql(&self) -> Result<String> {
391        // Use custom SELECT clause if provided, otherwise default to "SELECT data"
392        let select_clause = if let Some(ref projection) = self.custom_select {
393            format!("SELECT {} as data", projection)
394        } else {
395            "SELECT data".to_string()
396        };
397
398        let mut sql = format!("{} FROM {}", select_clause, self.entity);
399
400        if !self.sql_predicates.is_empty() {
401            sql.push_str(" WHERE ");
402            sql.push_str(&self.sql_predicates.join(" AND "));
403        }
404
405        if let Some(ref order) = self.order_by {
406            sql.push_str(" ORDER BY ");
407            sql.push_str(order);
408        }
409
410        if let Some(limit) = self.limit {
411            sql.push_str(&format!(" LIMIT {}", limit));
412        }
413
414        if let Some(offset) = self.offset {
415            sql.push_str(&format!(" OFFSET {}", offset));
416        }
417
418        Ok(sql)
419    }
420}
421
422#[cfg(test)]
423mod tests {
424
425    fn build_test_sql(entity: &str, predicates: Vec<&str>, order_by: Option<&str>) -> String {
426        let mut sql = format!("SELECT data FROM {}", entity);
427        if !predicates.is_empty() {
428            sql.push_str(" WHERE ");
429            sql.push_str(&predicates.join(" AND "));
430        }
431        if let Some(order) = order_by {
432            sql.push_str(" ORDER BY ");
433            sql.push_str(order);
434        }
435        sql
436    }
437
438    #[test]
439    fn test_build_sql_simple() {
440        let sql = build_test_sql("user", vec![], None);
441        assert_eq!(sql, "SELECT data FROM user");
442    }
443
444    #[test]
445    fn test_build_sql_with_where() {
446        let sql = build_test_sql("user", vec!["data->>'status' = 'active'"], None);
447        assert_eq!(
448            sql,
449            "SELECT data FROM user WHERE data->>'status' = 'active'"
450        );
451    }
452
453    #[test]
454    fn test_build_sql_with_order() {
455        let sql = build_test_sql("user", vec![], Some("data->>'name' ASC"));
456        assert_eq!(sql, "SELECT data FROM user ORDER BY data->>'name' ASC");
457    }
458
459    #[test]
460    fn test_build_sql_with_limit() {
461        let mut sql = "SELECT data FROM user".to_string();
462        sql.push_str(" LIMIT 10");
463        assert_eq!(sql, "SELECT data FROM user LIMIT 10");
464    }
465
466    #[test]
467    fn test_build_sql_with_offset() {
468        let mut sql = "SELECT data FROM user".to_string();
469        sql.push_str(" OFFSET 20");
470        assert_eq!(sql, "SELECT data FROM user OFFSET 20");
471    }
472
473    #[test]
474    fn test_build_sql_with_limit_and_offset() {
475        let mut sql = "SELECT data FROM user".to_string();
476        sql.push_str(" LIMIT 10");
477        sql.push_str(" OFFSET 20");
478        assert_eq!(sql, "SELECT data FROM user LIMIT 10 OFFSET 20");
479    }
480
481    #[test]
482    fn test_build_sql_complete() {
483        let mut sql = "SELECT data FROM user".to_string();
484        sql.push_str(" WHERE data->>'status' = 'active'");
485        sql.push_str(" ORDER BY data->>'name' ASC");
486        sql.push_str(" LIMIT 10");
487        sql.push_str(" OFFSET 20");
488        assert_eq!(
489            sql,
490            "SELECT data FROM user WHERE data->>'status' = 'active' ORDER BY data->>'name' ASC LIMIT 10 OFFSET 20"
491        );
492    }
493
494    // Projection tests
495    #[test]
496    fn test_build_sql_default_select() {
497        let sql = build_test_sql("users", vec![], None);
498        assert!(sql.starts_with("SELECT data FROM"));
499        assert_eq!(sql, "SELECT data FROM users");
500    }
501
502    #[test]
503    fn test_projection_single_field() {
504        let sql = "SELECT jsonb_build_object('id', data->>'id') as data FROM users".to_string();
505        assert!(sql.contains("as data"));
506        assert!(sql.starts_with("SELECT jsonb_build_object("));
507        assert!(sql.contains("FROM users"));
508    }
509
510    #[test]
511    fn test_projection_multiple_fields() {
512        let projection =
513            "jsonb_build_object('id', data->>'id', 'name', data->>'name', 'email', data->>'email')";
514        let sql = format!("SELECT {} as data FROM users", projection);
515        assert!(sql.contains("as data FROM users"));
516        assert!(sql.contains("jsonb_build_object("));
517        assert!(sql.contains("'id'"));
518        assert!(sql.contains("'name'"));
519        assert!(sql.contains("'email'"));
520    }
521
522    #[test]
523    fn test_projection_with_where_clause() {
524        let projection = "jsonb_build_object('id', data->>'id')";
525        let mut sql = format!("SELECT {} as data FROM users", projection);
526        sql.push_str(" WHERE data->>'status' = 'active'");
527        assert!(sql.contains("SELECT jsonb_build_object("));
528        assert!(sql.contains("as data FROM users"));
529        assert!(sql.contains("WHERE data->>'status' = 'active'"));
530    }
531
532    #[test]
533    fn test_projection_with_order_by() {
534        let projection = "jsonb_build_object('id', data->>'id')";
535        let mut sql = format!("SELECT {} as data FROM users", projection);
536        sql.push_str(" ORDER BY data->>'name' ASC");
537        assert!(sql.contains("SELECT jsonb_build_object("));
538        assert!(sql.contains("ORDER BY data->>'name' ASC"));
539    }
540
541    #[test]
542    fn test_projection_with_limit() {
543        let projection = "jsonb_build_object('id', data->>'id')";
544        let mut sql = format!("SELECT {} as data FROM users", projection);
545        sql.push_str(" LIMIT 1000");
546        assert!(sql.contains("as data FROM users"));
547        assert!(sql.contains("LIMIT 1000"));
548    }
549
550    #[test]
551    fn test_projection_with_offset() {
552        let projection = "jsonb_build_object('id', data->>'id')";
553        let mut sql = format!("SELECT {} as data FROM users", projection);
554        sql.push_str(" OFFSET 500");
555        assert!(sql.contains("as data FROM users"));
556        assert!(sql.contains("OFFSET 500"));
557    }
558
559    #[test]
560    fn test_projection_full_pipeline() {
561        let projection =
562            "jsonb_build_object('user_id', data->>'user_id', 'event_type', data->>'event_type')";
563        let mut sql = format!("SELECT {} as data FROM events", projection);
564        sql.push_str(" WHERE event_type IN ('purchase', 'view')");
565        sql.push_str(" ORDER BY timestamp DESC");
566        sql.push_str(" LIMIT 5000");
567        assert!(sql.contains("SELECT jsonb_build_object("));
568        assert!(sql.contains("'user_id'"));
569        assert!(sql.contains("'event_type'"));
570        assert!(sql.contains("as data FROM events"));
571        assert!(sql.contains("WHERE event_type IN ('purchase', 'view')"));
572        assert!(sql.contains("ORDER BY timestamp DESC"));
573        assert!(sql.contains("LIMIT 5000"));
574    }
575
576    // Stream pipeline integration tests
577    #[test]
578    fn test_typed_stream_with_value_type() {
579        // Verify that TypedJsonStream can wrap a raw JSON stream
580        use crate::stream::TypedJsonStream;
581        use futures::stream;
582
583        let values = vec![
584            Ok(serde_json::json!({"id": "1", "name": "Alice"})),
585            Ok(serde_json::json!({"id": "2", "name": "Bob"})),
586        ];
587
588        let json_stream = stream::iter(values);
589        let typed_stream: TypedJsonStream<serde_json::Value> =
590            TypedJsonStream::new(Box::new(json_stream));
591
592        // This verifies the stream compiles and has correct type
593        let _stream: Box<
594            dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
595        > = Box::new(typed_stream);
596    }
597
598    #[test]
599    fn test_filtered_stream_with_typed_output() {
600        // Verify that FilteredStream correctly filters before TypedJsonStream
601        use crate::stream::{FilteredStream, TypedJsonStream};
602        use futures::stream;
603
604        let values = vec![
605            Ok(serde_json::json!({"id": 1, "active": true})),
606            Ok(serde_json::json!({"id": 2, "active": false})),
607            Ok(serde_json::json!({"id": 3, "active": true})),
608        ];
609
610        let json_stream = stream::iter(values);
611        let predicate = Box::new(|v: &serde_json::Value| v["active"].as_bool().unwrap_or(false));
612
613        let filtered = FilteredStream::new(json_stream, predicate);
614        let typed_stream: TypedJsonStream<serde_json::Value> =
615            TypedJsonStream::new(Box::new(filtered));
616
617        // This verifies the full pipeline compiles
618        let _stream: Box<
619            dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
620        > = Box::new(typed_stream);
621    }
622
623    #[test]
624    fn test_stream_pipeline_type_flow() {
625        // Comprehensive test of stream type compatibility:
626        // JsonStream (Result<Value>) → FilteredStream (Result<Value>) → TypedJsonStream<T> (Result<T>)
627        use crate::stream::{FilteredStream, TypedJsonStream};
628        use futures::stream;
629        use serde::Deserialize;
630
631        #[derive(Deserialize, Debug)]
632        // Reason: test fixture struct used only for deserialization verification
633        #[allow(dead_code)]
634        struct TestUser {
635            id: String,
636            active: bool,
637        }
638
639        let values = vec![
640            Ok(serde_json::json!({"id": "1", "active": true})),
641            Ok(serde_json::json!({"id": "2", "active": false})),
642        ];
643
644        let json_stream = stream::iter(values);
645
646        // Step 1: FilteredStream filters JSON values
647        let predicate: Box<dyn Fn(&serde_json::Value) -> bool + Send> =
648            Box::new(|v| v["active"].as_bool().unwrap_or(false));
649        let filtered: Box<
650            dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
651        > = Box::new(FilteredStream::new(json_stream, predicate));
652
653        // Step 2: TypedJsonStream deserializes to TestUser
654        let typed: TypedJsonStream<TestUser> = TypedJsonStream::new(filtered);
655
656        // This verifies type system is compatible:
657        // - FilteredStream outputs Result<Value>
658        // - TypedJsonStream<T> takes Box<dyn Stream<Item = Result<Value>>>
659        // - TypedJsonStream<T> outputs Result<T>
660        let _final_stream: Box<
661            dyn futures::stream::Stream<Item = crate::Result<TestUser>> + Unpin,
662        > = Box::new(typed);
663    }
664}