use crate::client::FraiseClient;
#[allow(unused_imports)] use crate::error::WireError;
use crate::stream::QueryStream;
use crate::Result;
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::marker::PhantomData;
type RustPredicate = Box<dyn Fn(&Value) -> bool + Send>;
#[must_use = "call .execute() to run the query"]
pub struct QueryBuilder<T: DeserializeOwned + Unpin + 'static = serde_json::Value> {
client: FraiseClient,
entity: String,
sql_predicates: Vec<String>,
rust_predicate: Option<RustPredicate>,
order_by: Option<String>,
limit: Option<usize>,
offset: Option<usize>,
chunk_size: usize,
max_memory: Option<usize>,
soft_limit_warn_threshold: Option<f32>, soft_limit_fail_threshold: Option<f32>, enable_adaptive_chunking: bool,
adaptive_min_chunk_size: Option<usize>,
adaptive_max_chunk_size: Option<usize>,
custom_select: Option<String>, _phantom: PhantomData<T>,
}
impl<T: DeserializeOwned + Unpin + 'static> QueryBuilder<T> {
pub(crate) fn new(client: FraiseClient, entity: impl Into<String>) -> Self {
Self {
client,
entity: entity.into(),
sql_predicates: Vec::new(),
rust_predicate: None,
order_by: None,
limit: None,
offset: None,
chunk_size: 256,
max_memory: None,
soft_limit_warn_threshold: None,
soft_limit_fail_threshold: None,
enable_adaptive_chunking: true, adaptive_min_chunk_size: None,
adaptive_max_chunk_size: None,
custom_select: None,
_phantom: PhantomData,
}
}
pub fn where_sql(mut self, predicate: impl Into<String>) -> Self {
self.sql_predicates.push(predicate.into());
self
}
pub fn where_rust<F>(mut self, predicate: F) -> Self
where
F: Fn(&Value) -> bool + Send + 'static,
{
self.rust_predicate = Some(Box::new(predicate));
self
}
pub fn order_by(mut self, order: impl Into<String>) -> Self {
self.order_by = Some(order.into());
self
}
pub fn select_projection(mut self, projection_sql: impl Into<String>) -> Self {
self.custom_select = Some(projection_sql.into());
self
}
pub const fn limit(mut self, count: usize) -> Self {
self.limit = Some(count);
self
}
pub const fn offset(mut self, count: usize) -> Self {
self.offset = Some(count);
self
}
pub const fn chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub const fn max_memory(mut self, bytes: usize) -> Self {
self.max_memory = Some(bytes);
self
}
pub fn memory_soft_limits(mut self, warn_threshold: f32, fail_threshold: f32) -> Self {
let warn = warn_threshold.clamp(0.0, 1.0);
let fail = fail_threshold.clamp(0.0, 1.0);
if warn < fail {
self.soft_limit_warn_threshold = Some(warn);
self.soft_limit_fail_threshold = Some(fail);
}
self
}
pub const fn adaptive_chunking(mut self, enabled: bool) -> Self {
self.enable_adaptive_chunking = enabled;
self
}
pub const fn adaptive_min_size(mut self, size: usize) -> Self {
self.adaptive_min_chunk_size = Some(size);
self
}
pub const fn adaptive_max_size(mut self, size: usize) -> Self {
self.adaptive_max_chunk_size = Some(size);
self
}
pub async fn execute(self) -> Result<QueryStream<T>> {
let sql = self.build_sql()?;
tracing::debug!("executing query: {}", sql);
crate::metrics::counters::query_submitted(
&self.entity,
!self.sql_predicates.is_empty(),
self.rust_predicate.is_some(),
self.order_by.is_some(),
);
let stream = self
.client
.execute_query(
&sql,
self.chunk_size,
self.max_memory,
self.soft_limit_warn_threshold,
self.soft_limit_fail_threshold,
)
.await?;
Ok(QueryStream::new(stream, self.rust_predicate))
}
fn build_sql(&self) -> Result<String> {
let select_clause = if let Some(ref projection) = self.custom_select {
format!("SELECT {} as data", projection)
} else {
"SELECT data".to_string()
};
let mut sql = format!("{} FROM {}", select_clause, self.entity);
if !self.sql_predicates.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(&self.sql_predicates.join(" AND "));
}
if let Some(ref order) = self.order_by {
sql.push_str(" ORDER BY ");
sql.push_str(order);
}
if let Some(limit) = self.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
if let Some(offset) = self.offset {
sql.push_str(&format!(" OFFSET {}", offset));
}
Ok(sql)
}
}
#[cfg(test)]
mod tests {
fn build_test_sql(entity: &str, predicates: Vec<&str>, order_by: Option<&str>) -> String {
let mut sql = format!("SELECT data FROM {}", entity);
if !predicates.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(&predicates.join(" AND "));
}
if let Some(order) = order_by {
sql.push_str(" ORDER BY ");
sql.push_str(order);
}
sql
}
#[test]
fn test_build_sql_simple() {
let sql = build_test_sql("user", vec![], None);
assert_eq!(sql, "SELECT data FROM user");
}
#[test]
fn test_build_sql_with_where() {
let sql = build_test_sql("user", vec!["data->>'status' = 'active'"], None);
assert_eq!(
sql,
"SELECT data FROM user WHERE data->>'status' = 'active'"
);
}
#[test]
fn test_build_sql_with_order() {
let sql = build_test_sql("user", vec![], Some("data->>'name' ASC"));
assert_eq!(sql, "SELECT data FROM user ORDER BY data->>'name' ASC");
}
#[test]
fn test_build_sql_with_limit() {
let mut sql = "SELECT data FROM user".to_string();
sql.push_str(" LIMIT 10");
assert_eq!(sql, "SELECT data FROM user LIMIT 10");
}
#[test]
fn test_build_sql_with_offset() {
let mut sql = "SELECT data FROM user".to_string();
sql.push_str(" OFFSET 20");
assert_eq!(sql, "SELECT data FROM user OFFSET 20");
}
#[test]
fn test_build_sql_with_limit_and_offset() {
let mut sql = "SELECT data FROM user".to_string();
sql.push_str(" LIMIT 10");
sql.push_str(" OFFSET 20");
assert_eq!(sql, "SELECT data FROM user LIMIT 10 OFFSET 20");
}
#[test]
fn test_build_sql_complete() {
let mut sql = "SELECT data FROM user".to_string();
sql.push_str(" WHERE data->>'status' = 'active'");
sql.push_str(" ORDER BY data->>'name' ASC");
sql.push_str(" LIMIT 10");
sql.push_str(" OFFSET 20");
assert_eq!(
sql,
"SELECT data FROM user WHERE data->>'status' = 'active' ORDER BY data->>'name' ASC LIMIT 10 OFFSET 20"
);
}
#[test]
fn test_build_sql_default_select() {
let sql = build_test_sql("users", vec![], None);
assert!(sql.starts_with("SELECT data FROM"));
assert_eq!(sql, "SELECT data FROM users");
}
#[test]
fn test_projection_single_field() {
let sql = "SELECT jsonb_build_object('id', data->>'id') as data FROM users".to_string();
assert!(sql.contains("as data"));
assert!(sql.starts_with("SELECT jsonb_build_object("));
assert!(sql.contains("FROM users"));
}
#[test]
fn test_projection_multiple_fields() {
let projection =
"jsonb_build_object('id', data->>'id', 'name', data->>'name', 'email', data->>'email')";
let sql = format!("SELECT {} as data FROM users", projection);
assert!(sql.contains("as data FROM users"));
assert!(sql.contains("jsonb_build_object("));
assert!(sql.contains("'id'"));
assert!(sql.contains("'name'"));
assert!(sql.contains("'email'"));
}
#[test]
fn test_projection_with_where_clause() {
let projection = "jsonb_build_object('id', data->>'id')";
let mut sql = format!("SELECT {} as data FROM users", projection);
sql.push_str(" WHERE data->>'status' = 'active'");
assert!(sql.contains("SELECT jsonb_build_object("));
assert!(sql.contains("as data FROM users"));
assert!(sql.contains("WHERE data->>'status' = 'active'"));
}
#[test]
fn test_projection_with_order_by() {
let projection = "jsonb_build_object('id', data->>'id')";
let mut sql = format!("SELECT {} as data FROM users", projection);
sql.push_str(" ORDER BY data->>'name' ASC");
assert!(sql.contains("SELECT jsonb_build_object("));
assert!(sql.contains("ORDER BY data->>'name' ASC"));
}
#[test]
fn test_projection_with_limit() {
let projection = "jsonb_build_object('id', data->>'id')";
let mut sql = format!("SELECT {} as data FROM users", projection);
sql.push_str(" LIMIT 1000");
assert!(sql.contains("as data FROM users"));
assert!(sql.contains("LIMIT 1000"));
}
#[test]
fn test_projection_with_offset() {
let projection = "jsonb_build_object('id', data->>'id')";
let mut sql = format!("SELECT {} as data FROM users", projection);
sql.push_str(" OFFSET 500");
assert!(sql.contains("as data FROM users"));
assert!(sql.contains("OFFSET 500"));
}
#[test]
fn test_projection_full_pipeline() {
let projection =
"jsonb_build_object('user_id', data->>'user_id', 'event_type', data->>'event_type')";
let mut sql = format!("SELECT {} as data FROM events", projection);
sql.push_str(" WHERE event_type IN ('purchase', 'view')");
sql.push_str(" ORDER BY timestamp DESC");
sql.push_str(" LIMIT 5000");
assert!(sql.contains("SELECT jsonb_build_object("));
assert!(sql.contains("'user_id'"));
assert!(sql.contains("'event_type'"));
assert!(sql.contains("as data FROM events"));
assert!(sql.contains("WHERE event_type IN ('purchase', 'view')"));
assert!(sql.contains("ORDER BY timestamp DESC"));
assert!(sql.contains("LIMIT 5000"));
}
#[test]
fn test_typed_stream_with_value_type() {
use crate::stream::TypedJsonStream;
use futures::stream;
let values = vec![
Ok(serde_json::json!({"id": "1", "name": "Alice"})),
Ok(serde_json::json!({"id": "2", "name": "Bob"})),
];
let json_stream = stream::iter(values);
let typed_stream: TypedJsonStream<serde_json::Value> =
TypedJsonStream::new(Box::new(json_stream));
let _stream: Box<
dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
> = Box::new(typed_stream);
}
#[test]
fn test_filtered_stream_with_typed_output() {
use crate::stream::{FilteredStream, TypedJsonStream};
use futures::stream;
let values = vec![
Ok(serde_json::json!({"id": 1, "active": true})),
Ok(serde_json::json!({"id": 2, "active": false})),
Ok(serde_json::json!({"id": 3, "active": true})),
];
let json_stream = stream::iter(values);
let predicate = Box::new(|v: &serde_json::Value| v["active"].as_bool().unwrap_or(false));
let filtered = FilteredStream::new(json_stream, predicate);
let typed_stream: TypedJsonStream<serde_json::Value> =
TypedJsonStream::new(Box::new(filtered));
let _stream: Box<
dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Unpin,
> = Box::new(typed_stream);
}
#[test]
fn test_stream_pipeline_type_flow() {
use crate::stream::{FilteredStream, TypedJsonStream};
use futures::stream;
use serde::Deserialize;
#[derive(Deserialize, Debug)]
#[allow(dead_code)] struct TestUser {
id: String,
active: bool,
}
let values = vec![
Ok(serde_json::json!({"id": "1", "active": true})),
Ok(serde_json::json!({"id": "2", "active": false})),
];
let json_stream = stream::iter(values);
let predicate: Box<dyn Fn(&serde_json::Value) -> bool + Send> =
Box::new(|v| v["active"].as_bool().unwrap_or(false));
let filtered: Box<
dyn futures::stream::Stream<Item = crate::Result<serde_json::Value>> + Send + Unpin,
> = Box::new(FilteredStream::new(json_stream, predicate));
let typed: TypedJsonStream<TestUser> = TypedJsonStream::new(filtered);
let _final_stream: Box<
dyn futures::stream::Stream<Item = crate::Result<TestUser>> + Unpin,
> = Box::new(typed);
}
}