Skip to main content

fraiseql_wire/client/query_builder/
mod.rs

1//! Query builder API
2//!
3//! Generic query builder that supports automatic JSON deserialization to target types.
4//!
5//! **IMPORTANT**: Type T is **consumer-side only**.
6//!
7//! Type T does NOT affect:
8//! - SQL generation (always `SELECT data FROM {entity}`)
9//! - Filtering (`where_sql`, `where_rust`, `order_by`)
10//! - Wire protocol (identical for all T)
11//!
12//! Type T ONLY affects:
13//! - Consumer-side deserialization at `poll_next()`
14//! - Error messages (type name included)
15
16use crate::client::FraiseClient;
17#[allow(unused_imports)] // Reason: used only in doc links for `# Errors` sections
18use crate::error::WireError;
19use crate::stream::QueryStream;
20use crate::Result;
21use serde::de::DeserializeOwned;
22use serde_json::Value;
23use std::marker::PhantomData;
24
25/// Type alias for a Rust-side predicate function
26type RustPredicate = Box<dyn Fn(&Value) -> bool + Send>;
27
28/// Generic query builder
29///
30/// The type parameter T controls consumer-side deserialization only.
31/// Default type T = `serde_json::Value` for backward compatibility.
32///
33/// # Examples
34///
35/// Type-safe query (recommended):
36/// ```no_run
37/// // Requires: live Postgres connection via FraiseClient.
38/// use serde::Deserialize;
39///
40/// #[derive(Deserialize)]
41/// struct Project {
42///     id: String,
43///     name: String,
44/// }
45/// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
46/// let stream = client.query::<Project>("projects")
47///     .where_sql("status='active'")
48///     .execute()
49///     .await?;
50/// # Ok(())
51/// # }
52/// ```
53///
54/// Raw JSON query (debugging, forward compatibility):
55/// ```no_run
56/// // Requires: live Postgres connection via FraiseClient.
57/// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
58/// let stream = client.query::<serde_json::Value>("projects")
59///     .execute()
60///     .await?;
61/// # Ok(())
62/// # }
63/// ```
64#[must_use = "call .execute() to run the query"]
65pub struct QueryBuilder<T: DeserializeOwned + Unpin + 'static = serde_json::Value> {
66    client: FraiseClient,
67    entity: String,
68    sql_predicates: Vec<String>,
69    rust_predicate: Option<RustPredicate>,
70    order_by: Option<String>,
71    limit: Option<usize>,
72    offset: Option<usize>,
73    chunk_size: usize,
74    max_memory: Option<usize>,
75    soft_limit_warn_threshold: Option<f32>, // Percentage (0.0-1.0) at which to warn
76    soft_limit_fail_threshold: Option<f32>, // Percentage (0.0-1.0) at which to error
77    enable_adaptive_chunking: bool,
78    adaptive_min_chunk_size: Option<usize>,
79    adaptive_max_chunk_size: Option<usize>,
80    custom_select: Option<String>, // Optional custom SELECT clause for SQL projection
81    _phantom: PhantomData<T>,
82}
83
84impl<T: DeserializeOwned + Unpin + 'static> QueryBuilder<T> {
85    /// Create new query builder
86    pub(crate) fn new(client: FraiseClient, entity: impl Into<String>) -> Self {
87        Self {
88            client,
89            entity: entity.into(),
90            sql_predicates: Vec::new(),
91            rust_predicate: None,
92            order_by: None,
93            limit: None,
94            offset: None,
95            chunk_size: 256,
96            max_memory: None,
97            soft_limit_warn_threshold: None,
98            soft_limit_fail_threshold: None,
99            enable_adaptive_chunking: true, // Enabled by default
100            adaptive_min_chunk_size: None,
101            adaptive_max_chunk_size: None,
102            custom_select: None,
103            _phantom: PhantomData,
104        }
105    }
106
107    /// Add SQL WHERE clause predicate
108    ///
109    /// Type T does NOT affect SQL generation.
110    /// Multiple predicates are AND'ed together.
111    pub fn where_sql(mut self, predicate: impl Into<String>) -> Self {
112        self.sql_predicates.push(predicate.into());
113        self
114    }
115
116    /// Add Rust-side predicate
117    ///
118    /// Type T does NOT affect filtering.
119    /// Applied after SQL filtering, runs on streamed JSON values.
120    /// Predicates receive &`serde_json::Value` regardless of T.
121    pub fn where_rust<F>(mut self, predicate: F) -> Self
122    where
123        F: Fn(&Value) -> bool + Send + 'static,
124    {
125        self.rust_predicate = Some(Box::new(predicate));
126        self
127    }
128
129    /// Set ORDER BY clause
130    ///
131    /// Type T does NOT affect ordering.
132    pub fn order_by(mut self, order: impl Into<String>) -> Self {
133        self.order_by = Some(order.into());
134        self
135    }
136
137    /// Set a custom SELECT clause for SQL projection optimization
138    ///
139    /// When provided, this replaces the default `SELECT data` with a projection SQL
140    /// that filters fields at the database level, reducing network payload.
141    ///
142    /// The projection SQL will be wrapped as `SELECT {projection_sql} as data` to maintain
143    /// the hard invariant of a single `data` column.
144    ///
145    /// This feature enables architectural consistency with PostgreSQL optimization
146    /// and prepares for future performance improvements.
147    ///
148    /// # Arguments
149    ///
150    /// * `projection_sql` - PostgreSQL expression, typically from `jsonb_build_object()`
151    ///
152    /// # Example
153    ///
154    /// ```no_run
155    /// // Requires: live Postgres connection via FraiseClient.
156    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
157    /// # use serde::Deserialize;
158    /// # #[derive(Deserialize)] struct Project { id: String, name: String }
159    /// let stream = client
160    ///     .query::<Project>("projects")
161    ///     .select_projection("jsonb_build_object('id', data->>'id', 'name', data->>'name')")
162    ///     .execute()
163    ///     .await?;
164    /// # Ok(())
165    /// # }
166    /// ```
167    ///
168    /// # Backward Compatibility
169    ///
170    /// If not specified, defaults to `SELECT data` (original behavior).
171    pub fn select_projection(mut self, projection_sql: impl Into<String>) -> Self {
172        self.custom_select = Some(projection_sql.into());
173        self
174    }
175
176    /// Set LIMIT clause to restrict result set size
177    ///
178    /// # Example
179    ///
180    /// ```no_run
181    /// // Requires: live Postgres connection via FraiseClient.
182    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
183    /// # use serde::Deserialize;
184    /// # #[derive(Deserialize)] struct Project { id: String }
185    /// let stream = client.query::<Project>("projects")
186    ///     .limit(10)
187    ///     .execute()
188    ///     .await?;
189    /// # Ok(())
190    /// # }
191    /// ```
192    pub const fn limit(mut self, count: usize) -> Self {
193        self.limit = Some(count);
194        self
195    }
196
197    /// Set OFFSET clause to skip first N rows
198    ///
199    /// # Example
200    ///
201    /// ```no_run
202    /// // Requires: live Postgres connection via FraiseClient.
203    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
204    /// # use serde::Deserialize;
205    /// # #[derive(Deserialize)] struct Project { id: String }
206    /// let stream = client.query::<Project>("projects")
207    ///     .limit(10)
208    ///     .offset(20)  // Skip first 20, return next 10
209    ///     .execute()
210    ///     .await?;
211    /// # Ok(())
212    /// # }
213    /// ```
214    pub const fn offset(mut self, count: usize) -> Self {
215        self.offset = Some(count);
216        self
217    }
218
219    /// Set chunk size (default: 256)
220    pub const fn chunk_size(mut self, size: usize) -> Self {
221        self.chunk_size = size;
222        self
223    }
224
225    /// Set maximum memory limit for buffered items (default: unbounded)
226    ///
227    /// When the estimated memory usage of buffered items exceeds this limit,
228    /// the stream will return `WireError::MemoryLimitExceeded` instead of additional items.
229    ///
230    /// Memory is estimated as: `items_buffered * 2048 bytes` (conservative for typical JSON).
231    ///
232    /// By default, `max_memory()` is None (unbounded), maintaining backward compatibility.
233    /// Only set if you need hard memory bounds.
234    ///
235    /// # Example
236    ///
237    /// ```no_run
238    /// // Requires: live Postgres connection via FraiseClient.
239    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
240    /// # use serde::Deserialize;
241    /// # #[derive(Deserialize)] struct Project { id: String }
242    /// let stream = client
243    ///     .query::<Project>("projects")
244    ///     .max_memory(500_000_000)  // 500 MB limit
245    ///     .execute()
246    ///     .await?;
247    /// # Ok(())
248    /// # }
249    /// ```
250    ///
251    /// # Interpretation
252    ///
253    /// If memory limit is exceeded:
254    /// - It indicates the consumer is too slow relative to data arrival
255    /// - The error is terminal (non-retriable) — retrying won't help
256    /// - Consider: increasing consumer throughput, reducing `chunk_size`, or removing limit
257    pub const fn max_memory(mut self, bytes: usize) -> Self {
258        self.max_memory = Some(bytes);
259        self
260    }
261
262    /// Set soft memory limit thresholds for progressive degradation
263    ///
264    /// Allows warning at a threshold before hitting hard limit.
265    /// Only applies if `max_memory()` is also set.
266    ///
267    /// # Parameters
268    ///
269    /// - `warn_threshold`: Percentage (0.0-1.0) at which to emit a warning
270    /// - `fail_threshold`: Percentage (0.0-1.0) at which to return error (must be > `warn_threshold`)
271    ///
272    /// # Example
273    ///
274    /// ```no_run
275    /// // Requires: live Postgres connection via FraiseClient.
276    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
277    /// # use serde::Deserialize;
278    /// # #[derive(Deserialize)] struct Project { id: String }
279    /// let stream = client
280    ///     .query::<Project>("projects")
281    ///     .max_memory(500_000_000)  // 500 MB hard limit
282    ///     .memory_soft_limits(0.80, 1.0)  // Warn at 80%, error at 100%
283    ///     .execute()
284    ///     .await?;
285    /// # Ok(())
286    /// # }
287    /// ```
288    ///
289    /// If only hard limit needed, skip this and just use `max_memory()`.
290    pub fn memory_soft_limits(mut self, warn_threshold: f32, fail_threshold: f32) -> Self {
291        // Validate thresholds
292        let warn = warn_threshold.clamp(0.0, 1.0);
293        let fail = fail_threshold.clamp(0.0, 1.0);
294
295        if warn < fail {
296            self.soft_limit_warn_threshold = Some(warn);
297            self.soft_limit_fail_threshold = Some(fail);
298        }
299        self
300    }
301
302    /// Enable or disable adaptive chunk sizing (default: enabled)
303    ///
304    /// Adaptive chunking automatically adjusts `chunk_size` based on channel occupancy:
305    /// - High occupancy (>80%): Decreases chunk size to reduce producer pressure
306    /// - Low occupancy (<20%): Increases chunk size to optimize batching efficiency
307    ///
308    /// Enabled by default for zero-configuration self-tuning.
309    /// Disable if you need fixed chunk sizes or encounter unexpected behavior.
310    ///
311    /// # Example
312    ///
313    /// ```no_run
314    /// // Requires: live Postgres connection via FraiseClient.
315    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
316    /// # use serde::Deserialize;
317    /// # #[derive(Deserialize)] struct Project { id: String }
318    /// let stream = client
319    ///     .query::<Project>("projects")
320    ///     .adaptive_chunking(false)  // Disable adaptive tuning
321    ///     .chunk_size(512)  // Use fixed size
322    ///     .execute()
323    ///     .await?;
324    /// # Ok(())
325    /// # }
326    /// ```
327    pub const fn adaptive_chunking(mut self, enabled: bool) -> Self {
328        self.enable_adaptive_chunking = enabled;
329        self
330    }
331
332    /// Override minimum chunk size for adaptive tuning (default: 16)
333    ///
334    /// Adaptive chunking will never decrease chunk size below this value.
335    /// Useful if you need minimum batching for performance.
336    ///
337    /// Only applies if adaptive chunking is enabled.
338    ///
339    /// # Example
340    ///
341    /// ```no_run
342    /// // Requires: live Postgres connection via FraiseClient.
343    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
344    /// # use serde::Deserialize;
345    /// # #[derive(Deserialize)] struct Project { id: String }
346    /// let stream = client
347    ///     .query::<Project>("projects")
348    ///     .adaptive_chunking(true)
349    ///     .adaptive_min_size(32)  // Don't go below 32 items per batch
350    ///     .execute()
351    ///     .await?;
352    /// # Ok(())
353    /// # }
354    /// ```
355    pub const fn adaptive_min_size(mut self, size: usize) -> Self {
356        self.adaptive_min_chunk_size = Some(size);
357        self
358    }
359
360    /// Override maximum chunk size for adaptive tuning (default: 1024)
361    ///
362    /// Adaptive chunking will never increase chunk size above this value.
363    /// Useful if you need memory bounds or latency guarantees.
364    ///
365    /// Only applies if adaptive chunking is enabled.
366    ///
367    /// # Example
368    ///
369    /// ```no_run
370    /// // Requires: live Postgres connection via FraiseClient.
371    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
372    /// # use serde::Deserialize;
373    /// # #[derive(Deserialize)] struct Project { id: String }
374    /// let stream = client
375    ///     .query::<Project>("projects")
376    ///     .adaptive_chunking(true)
377    ///     .adaptive_max_size(512)  // Cap at 512 items per batch
378    ///     .execute()
379    ///     .await?;
380    /// # Ok(())
381    /// # }
382    /// ```
383    pub const fn adaptive_max_size(mut self, size: usize) -> Self {
384        self.adaptive_max_chunk_size = Some(size);
385        self
386    }
387
388    /// Execute query and return typed stream
389    ///
390    /// Type T ONLY affects consumer-side deserialization at `poll_next()`.
391    /// SQL, filtering, ordering, and wire protocol are identical regardless of T.
392    ///
393    /// The returned stream supports pause/resume/stats for advanced stream control.
394    ///
395    /// # Examples
396    ///
397    /// With type-safe deserialization:
398    /// ```no_run
399    /// // Requires: live Postgres connection via FraiseClient.
400    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
401    /// # use serde::Deserialize;
402    /// # #[derive(Deserialize)] struct Project { id: String }
403    /// # use futures::stream::StreamExt;
404    /// let mut stream = client.query::<Project>("projects").execute().await?;
405    /// while let Some(result) = stream.next().await {
406    ///     let project: Project = result?;
407    /// }
408    /// # Ok(())
409    /// # }
410    /// ```
411    ///
412    /// With raw JSON (escape hatch):
413    /// ```no_run
414    /// // Requires: live Postgres connection via FraiseClient.
415    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
416    /// # use futures::stream::StreamExt;
417    /// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
418    /// while let Some(result) = stream.next().await {
419    ///     let json: serde_json::Value = result?;
420    /// }
421    /// # Ok(())
422    /// # }
423    /// ```
424    ///
425    /// With stream control:
426    /// ```no_run
427    /// // Requires: live Postgres connection via FraiseClient.
428    /// # async fn example(client: fraiseql_wire::FraiseClient) -> fraiseql_wire::Result<()> {
429    /// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
430    /// stream.pause().await?;  // Pause the stream
431    /// let stats = stream.stats();  // Get statistics
432    /// stream.resume().await?;  // Resume the stream
433    /// # Ok(())
434    /// # }
435    /// ```
436    ///
437    /// # Errors
438    ///
439    /// Returns [`WireError`] if SQL generation fails or the underlying streaming query
440    /// cannot be started on the connection.
441    pub async fn execute(self) -> Result<QueryStream<T>> {
442        let sql = self.build_sql()?;
443        tracing::debug!("executing query: {}", sql);
444
445        // Record query submission metrics
446        crate::metrics::counters::query_submitted(
447            &self.entity,
448            !self.sql_predicates.is_empty(),
449            self.rust_predicate.is_some(),
450            self.order_by.is_some(),
451        );
452
453        let stream = self
454            .client
455            .execute_query(
456                &sql,
457                self.chunk_size,
458                self.max_memory,
459                self.soft_limit_warn_threshold,
460                self.soft_limit_fail_threshold,
461            )
462            .await?;
463
464        // Create QueryStream with optional Rust predicate
465        Ok(QueryStream::new(stream, self.rust_predicate))
466    }
467
468    /// Build SQL query
469    fn build_sql(&self) -> Result<String> {
470        // Use custom SELECT clause if provided, otherwise default to "SELECT data"
471        let select_clause = if let Some(ref projection) = self.custom_select {
472            format!("SELECT {} as data", projection)
473        } else {
474            "SELECT data".to_string()
475        };
476
477        let mut sql = format!("{} FROM {}", select_clause, self.entity);
478
479        if !self.sql_predicates.is_empty() {
480            sql.push_str(" WHERE ");
481            sql.push_str(&self.sql_predicates.join(" AND "));
482        }
483
484        if let Some(ref order) = self.order_by {
485            sql.push_str(" ORDER BY ");
486            sql.push_str(order);
487        }
488
489        if let Some(limit) = self.limit {
490            sql.push_str(&format!(" LIMIT {}", limit));
491        }
492
493        if let Some(offset) = self.offset {
494            sql.push_str(&format!(" OFFSET {}", offset));
495        }
496
497        Ok(sql)
498    }
499}
500
501#[cfg(test)]
502mod tests;