mod data;
mod delete;
mod helpers;
mod insert;
mod select;
mod update;
pub use data::*;
pub use delete::*;
pub use helpers::*;
pub use insert::*;
pub use select::*;
pub use update::*;
use std::collections::HashSet;
use std::fmt::Write;
use crate::{AdapterDialect, DinocoValue};
pub trait QueryBuilder: AdapterDialect {
fn build_select(&self, stmt: &SelectStatement) -> Query {
let mut sql = String::with_capacity(256);
let mut params = Vec::new();
sql.push_str("SELECT ");
if stmt.select.is_empty() {
sql.push('*');
} else {
push_joined(&mut sql, &stmt.select, ", ", |buf, column| {
render_query_identifier_into(self, column, buf);
});
}
sql.push_str(" FROM ");
render_query_identifier_into(self, &stmt.from, &mut sql);
if !stmt.conditions.is_empty() {
sql.push_str(" WHERE ");
render_condition_group_into(self, &stmt.conditions, &mut params, " AND ", &mut sql);
}
if !stmt.order_by.is_empty() {
sql.push_str(" ORDER BY ");
push_joined(&mut sql, &stmt.order_by, ", ", |buf, (column, direction)| {
let dir = match direction {
OrderDirection::Asc => "ASC",
OrderDirection::Desc => "DESC",
};
let _ = write!(buf, "{} {}", self.identifier(column), dir);
});
}
append_limit_skip(self, &mut sql, &mut params, stmt.limit, stmt.skip);
(sql, params)
}
fn build_count(&self, stmt: &SelectStatement) -> Query {
let (inner_sql, params) = self.build_select(stmt);
let mut sql = String::with_capacity(inner_sql.len() + 64);
let _ = write!(sql, "SELECT COUNT(*) FROM ({inner_sql}) AS {}", self.identifier("__dinoco_count"));
(sql, params)
}
fn build_partitioned_select(
&self,
stmt: &SelectStatement,
partition_column: &str,
row_number_alias: &str,
) -> Query {
let mut sql = String::with_capacity(512);
let mut params = Vec::new();
sql.push_str("SELECT * FROM (SELECT ");
if stmt.select.is_empty() {
sql.push('*');
} else {
push_joined(&mut sql, &stmt.select, ", ", |buf, column| {
render_query_identifier_into(self, column, buf);
});
}
sql.push_str(", ROW_NUMBER() OVER (PARTITION BY ");
render_query_identifier_into(self, partition_column, &mut sql);
sql.push_str(" ORDER BY ");
if stmt.order_by.is_empty() {
if let Some(first_column) = stmt.select.first() {
render_query_identifier_into(self, first_column, &mut sql);
} else {
render_query_identifier_into(self, partition_column, &mut sql);
}
} else {
push_joined(&mut sql, &stmt.order_by, ", ", |buf, (column, direction)| {
let dir = match direction {
OrderDirection::Asc => "ASC",
OrderDirection::Desc => "DESC",
};
let _ = write!(buf, "{} {}", self.identifier(column), dir);
});
}
let _ = write!(sql, ") AS {} FROM ", self.identifier(row_number_alias));
render_query_identifier_into(self, &stmt.from, &mut sql);
if !stmt.conditions.is_empty() {
sql.push_str(" WHERE ");
render_condition_group_into(self, &stmt.conditions, &mut params, " AND ", &mut sql);
}
let _ = write!(sql, ") AS {} WHERE ", self.identifier("__dinoco_partitioned"));
let row_alias = self.identifier(row_number_alias);
match (stmt.skip, stmt.limit) {
(Some(skip), Some(limit)) => {
params.push(DinocoValue::Integer(skip as i64));
let lower = self.bind_param(params.len());
params.push(DinocoValue::Integer((skip + limit) as i64));
let upper = self.bind_param(params.len());
let _ = write!(sql, "{row_alias} > {lower} AND {row_alias} <= {upper}");
}
(Some(skip), None) => {
params.push(DinocoValue::Integer(skip as i64));
let lower = self.bind_param(params.len());
let _ = write!(sql, "{row_alias} > {lower}");
}
(None, Some(limit)) => {
params.push(DinocoValue::Integer(limit as i64));
let upper = self.bind_param(params.len());
let _ = write!(sql, "{row_alias} <= {upper}");
}
(None, None) => sql.push_str("1 = 1"),
}
sql.push_str(" ORDER BY ");
render_query_identifier_into(self, outer_partition_column(partition_column), &mut sql);
let _ = write!(sql, ", {}", row_alias);
(sql, params)
}
fn build_insert(&self, stmt: &InsertStatement) -> Query {
let mut sql = String::with_capacity(256);
let mut params = Vec::with_capacity(stmt.rows.iter().map(Vec::len).sum());
let _ = write!(sql, "INSERT INTO {} (", self.identifier(&stmt.table));
push_joined(&mut sql, &stmt.columns, ", ", |buf, col| {
render_query_identifier_into(self, col, buf);
});
sql.push_str(") VALUES ");
push_joined(&mut sql, &stmt.rows, ", ", |buf, row| {
buf.push('(');
push_joined(buf, row, ", ", |b, value| {
params.push(value.clone());
b.push_str(&self.bind_value(params.len(), value));
});
buf.push(')');
});
if !stmt.returning.is_empty() {
sql.push_str(" RETURNING ");
push_joined(&mut sql, &stmt.returning, ", ", |buf, column| {
render_query_identifier_into(self, column, buf);
});
}
(sql, params)
}
fn build_update(&self, stmt: &UpdateStatement) -> Query {
let mut sql = String::with_capacity(256);
let mut params = Vec::new();
sql.push_str("UPDATE ");
render_query_identifier_into(self, &stmt.table, &mut sql);
if stmt.target.is_some() {
sql.push_str(" AS ");
sql.push_str(&self.identifier("__dinoco_update"));
}
sql.push_str(" SET ");
if stmt.batches.is_empty() {
let assignments = merge_update_assignments(&stmt.sets);
push_joined(&mut sql, &assignments, ", ", |buf, (column, assignments)| {
render_query_identifier_into(self, column, buf);
buf.push_str(" = ");
render_update_assignment_chain_into(self, column, assignments, &mut params, buf);
});
if let Some(target) = &stmt.target {
sql.push_str(" WHERE EXISTS (SELECT 1 FROM (SELECT ");
push_joined(&mut sql, &target.primary_keys, ", ", |buf, column| {
render_query_identifier_into(self, column, buf);
});
sql.push_str(" FROM ");
render_query_identifier_into(self, &stmt.table, &mut sql);
if !stmt.conditions.is_empty() {
sql.push_str(" WHERE ");
render_condition_group_into(self, &stmt.conditions, &mut params, " AND ", &mut sql);
}
sql.push_str(" LIMIT 1) AS ");
let target_alias = self.identifier("__dinoco_target");
sql.push_str(&target_alias);
sql.push_str(" WHERE ");
push_joined(&mut sql, &target.primary_keys, " AND ", |buf, column| {
let _ = write!(
buf,
"{}.{} = {}.{}",
self.identifier("__dinoco_update"),
self.identifier(column),
target_alias,
self.identifier(column)
);
});
sql.push(')');
} else if !stmt.conditions.is_empty() {
sql.push_str(" WHERE ");
render_condition_group_into(self, &stmt.conditions, &mut params, " AND ", &mut sql);
}
return (sql, params);
}
let mut columns: Vec<&str> = Vec::new();
let mut seen_columns = HashSet::new();
for batch in &stmt.batches {
for (column, _) in &batch.values {
if seen_columns.insert(column.as_str()) {
columns.push(column.as_str());
}
}
}
push_joined(&mut sql, &columns, ", ", |buf, column| {
render_query_identifier_into(self, column, buf);
buf.push_str(" = CASE");
for batch in &stmt.batches {
if let Some((_, value)) = batch.values.iter().find(|(bc, _)| bc == column) {
buf.push_str(" WHEN ");
render_condition_group_into(self, &batch.conditions, &mut params, " AND ", buf);
params.push(value.clone());
let _ = write!(buf, " THEN {}", self.bind_value(params.len(), value));
}
}
buf.push_str(" ELSE ");
render_query_identifier_into(self, column, buf);
buf.push_str(" END");
});
sql.push_str(" WHERE ");
push_joined(&mut sql, &stmt.batches, " OR ", |buf, batch| {
buf.push('(');
render_condition_group_into(self, &batch.conditions, &mut params, " AND ", buf);
buf.push(')');
});
(sql, params)
}
fn build_delete(&self, stmt: &DeleteStatement) -> Query {
let mut sql = String::with_capacity(128);
let mut params = Vec::new();
sql.push_str("DELETE FROM ");
render_query_identifier_into(self, &stmt.table, &mut sql);
if !stmt.batches.is_empty() {
sql.push_str(" WHERE ");
push_joined(&mut sql, &stmt.batches, " OR ", |buf, batch| {
buf.push('(');
render_condition_group_into(self, batch, &mut params, " AND ", buf);
buf.push(')');
});
} else if !stmt.conditions.is_empty() {
sql.push_str(" WHERE ");
render_condition_group_into(self, &stmt.conditions, &mut params, " AND ", &mut sql);
}
(sql, params)
}
}
fn outer_partition_column(partition_column: &str) -> &str {
partition_column.rsplit('.').next().unwrap_or(partition_column)
}
impl<T: AdapterDialect> QueryBuilder for T {}
fn merge_update_assignments(assignments: &[crate::UpdateAssignment]) -> Vec<(String, Vec<crate::UpdateOperation>)> {
let mut grouped = Vec::<(String, Vec<crate::UpdateOperation>)>::new();
for assignment in assignments {
if let Some((_, operations)) = grouped.iter_mut().find(|(column, _)| column == &assignment.column) {
operations.push(assignment.operation.clone());
continue;
}
grouped.push((assignment.column.clone(), vec![assignment.operation.clone()]));
}
grouped
}
fn render_update_assignment_chain_into<D: AdapterDialect + ?Sized>(
dialect: &D,
column: &str,
assignments: &[crate::UpdateOperation],
params: &mut Vec<DinocoValue>,
buf: &mut String,
) {
let mut expr = dialect.identifier(column);
for assignment in assignments {
match assignment {
crate::UpdateOperation::Set(value) => {
params.push(value.clone());
expr = dialect.bind_value(params.len(), value);
}
crate::UpdateOperation::Increment(value) => {
params.push(value.clone());
expr = format!("({expr} + {})", dialect.bind_value(params.len(), value));
}
crate::UpdateOperation::Decrement(value) => {
params.push(value.clone());
expr = format!("({expr} - {})", dialect.bind_value(params.len(), value));
}
crate::UpdateOperation::Multiply(value) => {
params.push(value.clone());
expr = format!("({expr} * {})", dialect.bind_value(params.len(), value));
}
crate::UpdateOperation::Division(value) => {
params.push(value.clone());
let cast_expr = dialect.cast_numeric_for_division(&expr);
expr = format!("({cast_expr} / {})", dialect.bind_value(params.len(), value));
}
}
}
buf.push_str(&expr);
}