fraiseql_wire/client/query_builder.rs
1//! Query builder API
2//!
3//! Generic query builder that supports automatic JSON deserialization to target types.
4//!
5//! **IMPORTANT**: Type T is **consumer-side only**.
6//!
7//! Type T does NOT affect:
8//! - SQL generation (always `SELECT data FROM {entity}`)
9//! - Filtering (where_sql, where_rust, order_by)
10//! - Wire protocol (identical for all T)
11//!
12//! Type T ONLY affects:
13//! - Consumer-side deserialization at poll_next()
14//! - Error messages (type name included)
15
16use crate::client::FraiseClient;
17use crate::stream::QueryStream;
18use crate::Result;
19use serde::de::DeserializeOwned;
20use serde_json::Value;
21use std::marker::PhantomData;
22
23/// Type alias for a Rust-side predicate function
24type RustPredicate = Box<dyn Fn(&Value) -> bool + Send>;
25
26/// Generic query builder
27///
28/// The type parameter T controls consumer-side deserialization only.
29/// Default type T = serde_json::Value for backward compatibility.
30///
31/// # Examples
32///
33/// Type-safe query (recommended):
34/// ```ignore
35/// use serde::Deserialize;
36///
37/// #[derive(Deserialize)]
38/// struct Project {
39/// id: String,
40/// name: String,
41/// }
42///
43/// let stream = client.query::<Project>("projects")
44/// .where_sql("status='active'")
45/// .execute()
46/// .await?;
47/// ```
48///
49/// Raw JSON query (debugging, forward compatibility):
50/// ```ignore
51/// let stream = client.query::<serde_json::Value>("projects")
52/// .execute()
53/// .await?;
54/// ```
55pub struct QueryBuilder<T: DeserializeOwned + Unpin + 'static = serde_json::Value> {
56 client: FraiseClient,
57 entity: String,
58 sql_predicates: Vec<String>,
59 rust_predicate: Option<RustPredicate>,
60 order_by: Option<String>,
61 limit: Option<usize>,
62 offset: Option<usize>,
63 chunk_size: usize,
64 max_memory: Option<usize>,
65 soft_limit_warn_threshold: Option<f32>, // Percentage (0.0-1.0) at which to warn
66 soft_limit_fail_threshold: Option<f32>, // Percentage (0.0-1.0) at which to error
67 enable_adaptive_chunking: bool,
68 adaptive_min_chunk_size: Option<usize>,
69 adaptive_max_chunk_size: Option<usize>,
70 custom_select: Option<String>, // Optional custom SELECT clause for SQL projection
71 _phantom: PhantomData<T>,
72}
73
74impl<T: DeserializeOwned + Unpin + 'static> QueryBuilder<T> {
75 /// Create new query builder
76 pub(crate) fn new(client: FraiseClient, entity: impl Into<String>) -> Self {
77 Self {
78 client,
79 entity: entity.into(),
80 sql_predicates: Vec::new(),
81 rust_predicate: None,
82 order_by: None,
83 limit: None,
84 offset: None,
85 chunk_size: 256,
86 max_memory: None,
87 soft_limit_warn_threshold: None,
88 soft_limit_fail_threshold: None,
89 enable_adaptive_chunking: true, // Enabled by default
90 adaptive_min_chunk_size: None,
91 adaptive_max_chunk_size: None,
92 custom_select: None,
93 _phantom: PhantomData,
94 }
95 }
96
97 /// Add SQL WHERE clause predicate
98 ///
99 /// Type T does NOT affect SQL generation.
100 /// Multiple predicates are AND'ed together.
101 pub fn where_sql(mut self, predicate: impl Into<String>) -> Self {
102 self.sql_predicates.push(predicate.into());
103 self
104 }
105
106 /// Add Rust-side predicate
107 ///
108 /// Type T does NOT affect filtering.
109 /// Applied after SQL filtering, runs on streamed JSON values.
110 /// Predicates receive &serde_json::Value regardless of T.
111 pub fn where_rust<F>(mut self, predicate: F) -> Self
112 where
113 F: Fn(&Value) -> bool + Send + 'static,
114 {
115 self.rust_predicate = Some(Box::new(predicate));
116 self
117 }
118
119 /// Set ORDER BY clause
120 ///
121 /// Type T does NOT affect ordering.
122 pub fn order_by(mut self, order: impl Into<String>) -> Self {
123 self.order_by = Some(order.into());
124 self
125 }
126
127 /// Set a custom SELECT clause for SQL projection optimization
128 ///
129 /// When provided, this replaces the default `SELECT data` with a projection SQL
130 /// that filters fields at the database level, reducing network payload.
131 ///
132 /// The projection SQL will be wrapped as `SELECT {projection_sql} as data` to maintain
133 /// the hard invariant of a single `data` column.
134 ///
135 /// This feature enables architectural consistency with PostgreSQL optimization
136 /// and prepares for future performance improvements.
137 ///
138 /// # Arguments
139 ///
140 /// * `projection_sql` - PostgreSQL expression, typically from `jsonb_build_object()`
141 ///
142 /// # Example
143 ///
144 /// ```ignore
145 /// let stream = client
146 /// .query::<Project>("projects")
147 /// .select_projection("jsonb_build_object('id', data->>'id', 'name', data->>'name')")
148 /// .execute()
149 /// .await?;
150 /// ```
151 ///
152 /// # Backward Compatibility
153 ///
154 /// If not specified, defaults to `SELECT data` (original behavior).
155 pub fn select_projection(mut self, projection_sql: impl Into<String>) -> Self {
156 self.custom_select = Some(projection_sql.into());
157 self
158 }
159
160 /// Set LIMIT clause to restrict result set size
161 ///
162 /// # Example
163 ///
164 /// ```ignore
165 /// let stream = client.query::<Project>("projects")
166 /// .limit(10)
167 /// .execute()
168 /// .await?;
169 /// ```
170 pub fn limit(mut self, count: usize) -> Self {
171 self.limit = Some(count);
172 self
173 }
174
175 /// Set OFFSET clause to skip first N rows
176 ///
177 /// # Example
178 ///
179 /// ```ignore
180 /// let stream = client.query::<Project>("projects")
181 /// .limit(10)
182 /// .offset(20) // Skip first 20, return next 10
183 /// .execute()
184 /// .await?;
185 /// ```
186 pub fn offset(mut self, count: usize) -> Self {
187 self.offset = Some(count);
188 self
189 }
190
191 /// Set chunk size (default: 256)
192 pub fn chunk_size(mut self, size: usize) -> Self {
193 self.chunk_size = size;
194 self
195 }
196
197 /// Set maximum memory limit for buffered items (default: unbounded)
198 ///
199 /// When the estimated memory usage of buffered items exceeds this limit,
200 /// the stream will return `Error::MemoryLimitExceeded` instead of additional items.
201 ///
202 /// Memory is estimated as: `items_buffered * 2048 bytes` (conservative for typical JSON).
203 ///
204 /// By default, `max_memory()` is None (unbounded), maintaining backward compatibility.
205 /// Only set if you need hard memory bounds.
206 ///
207 /// # Example
208 ///
209 /// ```ignore
210 /// let stream = client
211 /// .query::<Project>("projects")
212 /// .max_memory(500_000_000) // 500 MB limit
213 /// .execute()
214 /// .await?;
215 /// ```
216 ///
217 /// # Interpretation
218 ///
219 /// If memory limit is exceeded:
220 /// - It indicates the consumer is too slow relative to data arrival
221 /// - The error is terminal (non-retriable) — retrying won't help
222 /// - Consider: increasing consumer throughput, reducing chunk_size, or removing limit
223 pub fn max_memory(mut self, bytes: usize) -> Self {
224 self.max_memory = Some(bytes);
225 self
226 }
227
228 /// Set soft memory limit thresholds for progressive degradation
229 ///
230 /// Allows warning at a threshold before hitting hard limit.
231 /// Only applies if `max_memory()` is also set.
232 ///
233 /// # Parameters
234 ///
235 /// - `warn_threshold`: Percentage (0.0-1.0) at which to emit a warning
236 /// - `fail_threshold`: Percentage (0.0-1.0) at which to return error (must be > warn_threshold)
237 ///
238 /// # Example
239 ///
240 /// ```ignore
241 /// let stream = client
242 /// .query::<Project>("projects")
243 /// .max_memory(500_000_000) // 500 MB hard limit
244 /// .memory_soft_limits(0.80, 1.0) // Warn at 80%, error at 100%
245 /// .execute()
246 /// .await?;
247 /// ```
248 ///
249 /// If only hard limit needed, skip this and just use `max_memory()`.
250 pub fn memory_soft_limits(mut self, warn_threshold: f32, fail_threshold: f32) -> Self {
251 // Validate thresholds
252 let warn = warn_threshold.clamp(0.0, 1.0);
253 let fail = fail_threshold.clamp(0.0, 1.0);
254
255 if warn < fail {
256 self.soft_limit_warn_threshold = Some(warn);
257 self.soft_limit_fail_threshold = Some(fail);
258 }
259 self
260 }
261
262 /// Enable or disable adaptive chunk sizing (default: enabled)
263 ///
264 /// Adaptive chunking automatically adjusts `chunk_size` based on channel occupancy:
265 /// - High occupancy (>80%): Decreases chunk size to reduce producer pressure
266 /// - Low occupancy (<20%): Increases chunk size to optimize batching efficiency
267 ///
268 /// Enabled by default for zero-configuration self-tuning.
269 /// Disable if you need fixed chunk sizes or encounter unexpected behavior.
270 ///
271 /// # Example
272 ///
273 /// ```ignore
274 /// let stream = client
275 /// .query::<Project>("projects")
276 /// .adaptive_chunking(false) // Disable adaptive tuning
277 /// .chunk_size(512) // Use fixed size
278 /// .execute()
279 /// .await?;
280 /// ```
281 pub fn adaptive_chunking(mut self, enabled: bool) -> Self {
282 self.enable_adaptive_chunking = enabled;
283 self
284 }
285
286 /// Override minimum chunk size for adaptive tuning (default: 16)
287 ///
288 /// Adaptive chunking will never decrease chunk size below this value.
289 /// Useful if you need minimum batching for performance.
290 ///
291 /// Only applies if adaptive chunking is enabled.
292 ///
293 /// # Example
294 ///
295 /// ```ignore
296 /// let stream = client
297 /// .query::<Project>("projects")
298 /// .adaptive_chunking(true)
299 /// .adaptive_min_size(32) // Don't go below 32 items per batch
300 /// .execute()
301 /// .await?;
302 /// ```
303 pub fn adaptive_min_size(mut self, size: usize) -> Self {
304 self.adaptive_min_chunk_size = Some(size);
305 self
306 }
307
308 /// Override maximum chunk size for adaptive tuning (default: 1024)
309 ///
310 /// Adaptive chunking will never increase chunk size above this value.
311 /// Useful if you need memory bounds or latency guarantees.
312 ///
313 /// Only applies if adaptive chunking is enabled.
314 ///
315 /// # Example
316 ///
317 /// ```ignore
318 /// let stream = client
319 /// .query::<Project>("projects")
320 /// .adaptive_chunking(true)
321 /// .adaptive_max_size(512) // Cap at 512 items per batch
322 /// .execute()
323 /// .await?;
324 /// ```
325 pub fn adaptive_max_size(mut self, size: usize) -> Self {
326 self.adaptive_max_chunk_size = Some(size);
327 self
328 }
329
330 /// Execute query and return typed stream
331 ///
332 /// Type T ONLY affects consumer-side deserialization at poll_next().
333 /// SQL, filtering, ordering, and wire protocol are identical regardless of T.
334 ///
335 /// The returned stream supports pause/resume/stats for advanced stream control.
336 ///
337 /// # Examples
338 ///
339 /// With type-safe deserialization:
340 /// ```ignore
341 /// let stream = client.query::<Project>("projects").execute().await?;
342 /// while let Some(result) = stream.next().await {
343 /// let project: Project = result?;
344 /// }
345 /// ```
346 ///
347 /// With raw JSON (escape hatch):
348 /// ```ignore
349 /// let stream = client.query::<serde_json::Value>("projects").execute().await?;
350 /// while let Some(result) = stream.next().await {
351 /// let json: Value = result?;
352 /// }
353 /// ```
354 ///
355 /// With stream control:
356 /// ```ignore
357 /// let mut stream = client.query::<serde_json::Value>("projects").execute().await?;
358 /// stream.pause().await?; // Pause the stream
359 /// let stats = stream.stats(); // Get statistics
360 /// stream.resume().await?; // Resume the stream
361 /// ```
362 pub async fn execute(self) -> Result<QueryStream<T>> {
363 let sql = self.build_sql()?;
364 tracing::debug!("executing query: {}", sql);
365
366 // Record query submission metrics
367 crate::metrics::counters::query_submitted(
368 &self.entity,
369 !self.sql_predicates.is_empty(),
370 self.rust_predicate.is_some(),
371 self.order_by.is_some(),
372 );
373
374 let stream = self
375 .client
376 .execute_query(
377 &sql,
378 self.chunk_size,
379 self.max_memory,
380 self.soft_limit_warn_threshold,
381 self.soft_limit_fail_threshold,
382 )
383 .await?;
384
385 // Create QueryStream with optional Rust predicate
386 Ok(QueryStream::new(stream, self.rust_predicate))
387 }
388
389 /// Build SQL query
390 fn build_sql(&self) -> Result<String> {
391 // Use custom SELECT clause if provided, otherwise default to "SELECT data"
392 let select_clause = if let Some(ref projection) = self.custom_select {
393 format!("SELECT {} as data", projection)
394 } else {
395 "SELECT data".to_string()
396 };
397
398 let mut sql = format!("{} FROM {}", select_clause, self.entity);
399
400 if !self.sql_predicates.is_empty() {
401 sql.push_str(" WHERE ");
402 sql.push_str(&self.sql_predicates.join(" AND "));
403 }
404
405 if let Some(ref order) = self.order_by {
406 sql.push_str(" ORDER BY ");
407 sql.push_str(order);
408 }
409
410 if let Some(limit) = self.limit {
411 sql.push_str(&format!(" LIMIT {}", limit));
412 }
413
414 if let Some(offset) = self.offset {
415 sql.push_str(&format!(" OFFSET {}", offset));
416 }
417
418 Ok(sql)
419 }
420}
421
422#[cfg(test)]
423mod tests {
424
425 fn build_test_sql(entity: &str, predicates: Vec<&str>, order_by: Option<&str>) -> String {
426 let mut sql = format!("SELECT data FROM {}", entity);
427 if !predicates.is_empty() {
428 sql.push_str(" WHERE ");
429 sql.push_str(&predicates.join(" AND "));
430 }
431 if let Some(order) = order_by {
432 sql.push_str(" ORDER BY ");
433 sql.push_str(order);
434 }
435 sql
436 }
437
438 #[test]
439 fn test_build_sql_simple() {
440 let sql = build_test_sql("user", vec![], None);
441 assert_eq!(sql, "SELECT data FROM user");
442 }
443
444 #[test]
445 fn test_build_sql_with_where() {
446 let sql = build_test_sql("user", vec!["data->>'status' = 'active'"], None);
447 assert_eq!(
448 sql,
449 "SELECT data FROM user WHERE data->>'status' = 'active'"
450 );
451 }
452
453 #[test]
454 fn test_build_sql_with_order() {
455 let sql = build_test_sql("user", vec![], Some("data->>'name' ASC"));
456 assert_eq!(sql, "SELECT data FROM user ORDER BY data->>'name' ASC");
457 }
458
459 #[test]
460 fn test_build_sql_with_limit() {
461 let mut sql = "SELECT data FROM user".to_string();
462 sql.push_str(" LIMIT 10");
463 assert_eq!(sql, "SELECT data FROM user LIMIT 10");
464 }
465
466 #[test]
467 fn test_build_sql_with_offset() {
468 let mut sql = "SELECT data FROM user".to_string();
469 sql.push_str(" OFFSET 20");
470 assert_eq!(sql, "SELECT data FROM user OFFSET 20");
471 }
472
473 #[test]
474 fn test_build_sql_with_limit_and_offset() {
475 let mut sql = "SELECT data FROM user".to_string();
476 sql.push_str(" LIMIT 10");
477 sql.push_str(" OFFSET 20");
478 assert_eq!(sql, "SELECT data FROM user LIMIT 10 OFFSET 20");
479 }
480
481 #[test]
482 fn test_build_sql_complete() {
483 let mut sql = "SELECT data FROM user".to_string();
484 sql.push_str(" WHERE data->>'status' = 'active'");
485 sql.push_str(" ORDER BY data->>'name' ASC");
486 sql.push_str(" LIMIT 10");
487 sql.push_str(" OFFSET 20");
488 assert_eq!(
489 sql,
490 "SELECT data FROM user WHERE data->>'status' = 'active' ORDER BY data->>'name' ASC LIMIT 10 OFFSET 20"
491 );
492 }
493
494 // Projection tests
495 #[test]
496 fn test_build_sql_default_select() {
497 let sql = build_test_sql("users", vec![], None);
498 assert!(sql.starts_with("SELECT data FROM"));
499 assert_eq!(sql, "SELECT data FROM users");
500 }
501
502 #[test]
503 fn test_projection_single_field() {
504 let sql = "SELECT jsonb_build_object('id', data->>'id') as data FROM users".to_string();
505 assert!(sql.contains("as data"));
506 assert!(sql.starts_with("SELECT jsonb_build_object("));
507 assert!(sql.contains("FROM users"));
508 }
509
510 #[test]
511 fn test_projection_multiple_fields() {
512 let projection =
513 "jsonb_build_object('id', data->>'id', 'name', data->>'name', 'email', data->>'email')";
514 let sql = format!("SELECT {} as data FROM users", projection);
515 assert!(sql.contains("as data FROM users"));
516 assert!(sql.contains("jsonb_build_object("));
517 assert!(sql.contains("'id'"));
518 assert!(sql.contains("'name'"));
519 assert!(sql.contains("'email'"));
520 }
521
522 #[test]
523 fn test_projection_with_where_clause() {
524 let projection = "jsonb_build_object('id', data->>'id')";
525 let mut sql = format!("SELECT {} as data FROM users", projection);
526 sql.push_str(" WHERE data->>'status' = 'active'");
527 assert!(sql.contains("SELECT jsonb_build_object("));
528 assert!(sql.contains("as data FROM users"));
529 assert!(sql.contains("WHERE data->>'status' = 'active'"));
530 }
531
532 #[test]
533 fn test_projection_with_order_by() {
534 let projection = "jsonb_build_object('id', data->>'id')";
535 let mut sql = format!("SELECT {} as data FROM users", projection);
536 sql.push_str(" ORDER BY data->>'name' ASC");
537 assert!(sql.contains("SELECT jsonb_build_object("));
538 assert!(sql.contains("ORDER BY data->>'name' ASC"));
539 }
540
541 #[test]
542 fn test_projection_with_limit() {
543 let projection = "jsonb_build_object('id', data->>'id')";
544 let mut sql = format!("SELECT {} as data FROM users", projection);
545 sql.push_str(" LIMIT 1000");
546 assert!(sql.contains("as data FROM users"));
547 assert!(sql.contains("LIMIT 1000"));
548 }
549
550 #[test]
551 fn test_projection_with_offset() {
552 let projection = "jsonb_build_object('id', data->>'id')";
553 let mut sql = format!("SELECT {} as data FROM users", projection);
554 sql.push_str(" OFFSET 500");
555 assert!(sql.contains("as data FROM users"));
556 assert!(sql.contains("OFFSET 500"));
557 }
558
559 #[test]
560 fn test_projection_full_pipeline() {
561 let projection =
562 "jsonb_build_object('user_id', data->>'user_id', 'event_type', data->>'event_type')";
563 let mut sql = format!("SELECT {} as data FROM events", projection);
564 sql.push_str(" WHERE event_type IN ('purchase', 'view')");
565 sql.push_str(" ORDER BY timestamp DESC");
566 sql.push_str(" LIMIT 5000");
567 assert!(sql.contains("SELECT jsonb_build_object("));
568 assert!(sql.contains("'user_id'"));
569 assert!(sql.contains("'event_type'"));
570 assert!(sql.contains("as data FROM events"));
571 assert!(sql.contains("WHERE event_type IN ('purchase', 'view')"));
572 assert!(sql.contains("ORDER BY timestamp DESC"));
573 assert!(sql.contains("LIMIT 5000"));
574 }
575
576 // Stream pipeline integration tests
577 #[test]
578 fn test_typed_stream_with_value_type() {
579 // Verify that TypedJsonStream can wrap a raw JSON stream
580 use crate::stream::TypedJsonStream;
581 use futures::stream;
582
583 let values = vec![
584 Ok(serde_json::json!({"id": "1", "name": "Alice"})),
585 Ok(serde_json::json!({"id": "2", "name": "Bob"})),
586 ];
587
588 let json_stream = stream::iter(values);
589 let typed_stream: TypedJsonStream<serde_json::Value> =
590 TypedJsonStream::new(Box::new(json_stream));
591
592 // This verifies the stream compiles and has correct type
593 let _stream: Box<
594 dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
595 > = Box::new(typed_stream);
596 }
597
598 #[test]
599 fn test_filtered_stream_with_typed_output() {
600 // Verify that FilteredStream correctly filters before TypedJsonStream
601 use crate::stream::{FilteredStream, TypedJsonStream};
602 use futures::stream;
603
604 let values = vec![
605 Ok(serde_json::json!({"id": 1, "active": true})),
606 Ok(serde_json::json!({"id": 2, "active": false})),
607 Ok(serde_json::json!({"id": 3, "active": true})),
608 ];
609
610 let json_stream = stream::iter(values);
611 let predicate = Box::new(|v: &serde_json::Value| v["active"].as_bool().unwrap_or(false));
612
613 let filtered = FilteredStream::new(json_stream, predicate);
614 let typed_stream: TypedJsonStream<serde_json::Value> =
615 TypedJsonStream::new(Box::new(filtered));
616
617 // This verifies the full pipeline compiles
618 let _stream: Box<
619 dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
620 > = Box::new(typed_stream);
621 }
622
623 #[test]
624 fn test_stream_pipeline_type_flow() {
625 // Comprehensive test of stream type compatibility:
626 // JsonStream (Result<Value>) → FilteredStream (Result<Value>) → TypedJsonStream<T> (Result<T>)
627 use crate::stream::{FilteredStream, TypedJsonStream};
628 use futures::stream;
629 use serde::Deserialize;
630
631 #[derive(Deserialize, Debug)]
632 // Reason: test fixture struct used only for deserialization verification
633 #[allow(dead_code)]
634 struct TestUser {
635 id: String,
636 active: bool,
637 }
638
639 let values = vec![
640 Ok(serde_json::json!({"id": "1", "active": true})),
641 Ok(serde_json::json!({"id": "2", "active": false})),
642 ];
643
644 let json_stream = stream::iter(values);
645
646 // Step 1: FilteredStream filters JSON values
647 let predicate: Box<dyn Fn(&serde_json::Value) -> bool + Send> =
648 Box::new(|v| v["active"].as_bool().unwrap_or(false));
649 let filtered: Box<
650 dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
651 > = Box::new(FilteredStream::new(json_stream, predicate));
652
653 // Step 2: TypedJsonStream deserializes to TestUser
654 let typed: TypedJsonStream<TestUser> = TypedJsonStream::new(filtered);
655
656 // This verifies type system is compatible:
657 // - FilteredStream outputs Result<Value>
658 // - TypedJsonStream<T> takes Box<dyn Stream<Item = Result<Value>>>
659 // - TypedJsonStream<T> outputs Result<T>
660 let _final_stream: Box<
661 dyn futures::stream::Stream<Item = crate::Result<TestUser>> + Unpin,
662 > = Box::new(typed);
663 }
664}