use crate::client::FraiseClient;
#[allow(unused_imports)] use crate::error::WireError;
use crate::stream::QueryStream;
use crate::Result;
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::marker::PhantomData;
type RustPredicate = Box<dyn Fn(&Value) -> bool + Send>;
#[must_use = "call .execute() to run the query"]
pub struct QueryBuilder<T: DeserializeOwned + Unpin + 'static = serde_json::Value> {
client: FraiseClient,
entity: String,
sql_predicates: Vec<String>,
rust_predicate: Option<RustPredicate>,
order_by: Option<String>,
limit: Option<usize>,
offset: Option<usize>,
chunk_size: usize,
max_memory: Option<usize>,
soft_limit_warn_threshold: Option<f32>, soft_limit_fail_threshold: Option<f32>, enable_adaptive_chunking: bool,
adaptive_min_chunk_size: Option<usize>,
adaptive_max_chunk_size: Option<usize>,
custom_select: Option<String>, _phantom: PhantomData<T>,
}
impl<T: DeserializeOwned + Unpin + 'static> QueryBuilder<T> {
pub(crate) fn new(client: FraiseClient, entity: impl Into<String>) -> Self {
Self {
client,
entity: entity.into(),
sql_predicates: Vec::new(),
rust_predicate: None,
order_by: None,
limit: None,
offset: None,
chunk_size: 256,
max_memory: None,
soft_limit_warn_threshold: None,
soft_limit_fail_threshold: None,
enable_adaptive_chunking: true, adaptive_min_chunk_size: None,
adaptive_max_chunk_size: None,
custom_select: None,
_phantom: PhantomData,
}
}
pub fn where_sql(mut self, predicate: impl Into<String>) -> Self {
self.sql_predicates.push(predicate.into());
self
}
pub fn where_rust<F>(mut self, predicate: F) -> Self
where
F: Fn(&Value) -> bool + Send + 'static,
{
self.rust_predicate = Some(Box::new(predicate));
self
}
pub fn order_by(mut self, order: impl Into<String>) -> Self {
self.order_by = Some(order.into());
self
}
pub fn select_projection(mut self, projection_sql: impl Into<String>) -> Self {
self.custom_select = Some(projection_sql.into());
self
}
pub const fn limit(mut self, count: usize) -> Self {
self.limit = Some(count);
self
}
pub const fn offset(mut self, count: usize) -> Self {
self.offset = Some(count);
self
}
pub const fn chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub const fn max_memory(mut self, bytes: usize) -> Self {
self.max_memory = Some(bytes);
self
}
pub fn memory_soft_limits(mut self, warn_threshold: f32, fail_threshold: f32) -> Self {
let warn = warn_threshold.clamp(0.0, 1.0);
let fail = fail_threshold.clamp(0.0, 1.0);
if warn < fail {
self.soft_limit_warn_threshold = Some(warn);
self.soft_limit_fail_threshold = Some(fail);
}
self
}
pub const fn adaptive_chunking(mut self, enabled: bool) -> Self {
self.enable_adaptive_chunking = enabled;
self
}
pub const fn adaptive_min_size(mut self, size: usize) -> Self {
self.adaptive_min_chunk_size = Some(size);
self
}
pub const fn adaptive_max_size(mut self, size: usize) -> Self {
self.adaptive_max_chunk_size = Some(size);
self
}
pub async fn execute(self) -> Result<QueryStream<T>> {
let sql = self.build_sql()?;
tracing::debug!("executing query: {}", sql);
crate::metrics::counters::query_submitted(
&self.entity,
!self.sql_predicates.is_empty(),
self.rust_predicate.is_some(),
self.order_by.is_some(),
);
let stream = self
.client
.execute_query(
&sql,
self.chunk_size,
self.max_memory,
self.soft_limit_warn_threshold,
self.soft_limit_fail_threshold,
)
.await?;
Ok(QueryStream::new(stream, self.rust_predicate))
}
fn build_sql(&self) -> Result<String> {
let select_clause = if let Some(ref projection) = self.custom_select {
format!("SELECT {} as data", projection)
} else {
"SELECT data".to_string()
};
let mut sql = format!("{} FROM {}", select_clause, self.entity);
if !self.sql_predicates.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(&self.sql_predicates.join(" AND "));
}
if let Some(ref order) = self.order_by {
sql.push_str(" ORDER BY ");
sql.push_str(order);
}
if let Some(limit) = self.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
if let Some(offset) = self.offset {
sql.push_str(&format!(" OFFSET {}", offset));
}
Ok(sql)
}
}
#[cfg(test)]
mod tests;