use std::marker::PhantomData;
use sqlx::{
Executor,
FromRow,
Postgres,
};
use sqlxo_traits::{
Creatable,
CreateModel,
JoinNavigationModel,
QueryContext,
};
use crate::{
blocks::{
InsertHead,
SqlWriter,
},
select::{
self,
SelectionList,
},
Buildable,
ExecutablePlan,
FetchablePlan,
Planable,
Result,
};
#[allow(dead_code)]
pub trait BuildableInsertQuery<C, Row = <C as QueryContext>::Model>:
Buildable<C, Row = Row, Plan: Planable<C, Row>>
where
C: QueryContext,
Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
}
pub struct InsertQueryPlan<
'a,
C: QueryContext,
Row = <C as QueryContext>::Model,
> where
C::Model: Creatable,
{
pub(crate) table: &'a str,
pub(crate) create_model: <C::Model as Creatable>::CreateModel,
pub(crate) insert_marker_field: Option<&'static str>,
pub(crate) auto_joins: bool,
pub(crate) include_lazy_relations: bool,
pub(crate) selection: Option<SelectionList<Row, select::SelectionColumn>>,
row: PhantomData<Row>,
}
impl<'a, C, Row> InsertQueryPlan<'a, C, Row>
where
C: QueryContext,
C::Model: Creatable,
{
fn to_query_builder(&self) -> sqlx::QueryBuilder<'static, Postgres> {
let head = InsertHead::new(self.table);
let mut w = SqlWriter::new(head);
self.create_model
.apply_inserts(w.query_builder_mut(), self.insert_marker_field);
w.into_builder()
}
fn push_returning(&self, qb: &mut sqlx::QueryBuilder<'static, Postgres>) {
select::push_returning(qb, self.table, self.selection.as_ref());
}
fn to_execute_with_relations_query_builder(
&self,
) -> sqlx::QueryBuilder<'static, Postgres> {
let mut qb = sqlx::QueryBuilder::<Postgres>::new(
"WITH affected AS (INSERT INTO ",
);
qb.push(self.table);
self.create_model
.apply_inserts(&mut qb, self.insert_marker_field);
qb.push(" RETURNING *)");
self.create_model.append_relation_ctes(&mut qb, "affected");
qb.push(" SELECT COUNT(*)::BIGINT AS __sqlxo_count FROM affected");
qb
}
#[cfg(any(test, feature = "test-utils"))]
pub fn sql(&self) -> String {
use sqlx::Execute;
self.to_query_builder().build().sql().to_string()
}
}
#[async_trait::async_trait]
impl<'a, C, Row> ExecutablePlan<C> for InsertQueryPlan<'a, C, Row>
where
C: QueryContext,
C::Model: Creatable,
Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
async fn execute<'e, E>(&self, exec: E) -> Result<u64>
where
E: Executor<'e, Database = Postgres>,
{
#[derive(sqlx::FromRow)]
struct CountRow {
#[sqlx(rename = "__sqlxo_count")]
count: i64,
}
let row: CountRow = self
.to_execute_with_relations_query_builder()
.build_query_as::<CountRow>()
.fetch_one(exec)
.await?;
Ok(row.count.max(0) as u64)
}
}
#[async_trait::async_trait]
impl<'a, C, Row> FetchablePlan<C, Row> for InsertQueryPlan<'a, C, Row>
where
C: QueryContext,
C::Model: Creatable,
Row: Send
+ Sync
+ Unpin
+ for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
+ InsertFetchRow<C>,
{
async fn fetch_one<'e, E>(&self, exec: E) -> Result<Row>
where
E: Executor<'e, Database = Postgres>,
{
Ok(<Row as InsertFetchRow<C>>::fetch_one(self, exec).await?)
}
async fn fetch_all<'e, E>(&self, exec: E) -> Result<Vec<Row>>
where
E: Executor<'e, Database = Postgres>,
{
Ok(<Row as InsertFetchRow<C>>::fetch_all(self, exec).await?)
}
async fn fetch_optional<'e, E>(&self, exec: E) -> Result<Option<Row>>
where
E: Executor<'e, Database = Postgres>,
{
Ok(<Row as InsertFetchRow<C>>::fetch_optional(self, exec).await?)
}
}
#[async_trait::async_trait]
trait InsertFetchRow<C: QueryContext>: Sized
where
C::Model: Creatable,
{
async fn fetch_one<'a, 'e, E>(
plan: &InsertQueryPlan<'a, C, Self>,
exec: E,
) -> Result<Self>
where
E: Executor<'e, Database = Postgres>;
async fn fetch_all<'a, 'e, E>(
plan: &InsertQueryPlan<'a, C, Self>,
exec: E,
) -> Result<Vec<Self>>
where
E: Executor<'e, Database = Postgres>;
async fn fetch_optional<'a, 'e, E>(
plan: &InsertQueryPlan<'a, C, Self>,
exec: E,
) -> Result<Option<Self>>
where
E: Executor<'e, Database = Postgres>;
}
fn push_join_path_inline(
qb: &mut sqlx::QueryBuilder<'static, Postgres>,
path: &sqlxo_traits::JoinPath,
base_table: &str,
) {
if path.is_empty() {
return;
}
let mut left_alias = base_table.to_string();
let mut alias_prefix = String::new();
for segment in path.segments() {
let join_word = match segment.kind {
sqlxo_traits::JoinKind::Inner => " INNER JOIN ",
sqlxo_traits::JoinKind::Left => " LEFT JOIN ",
};
if let Some(through) = segment.descriptor.through {
let mut through_alias = alias_prefix.clone();
through_alias.push_str(through.alias_segment);
let clause = format!(
r#"{join}{table} AS "{alias}" ON "{left}"."{left_field}" = "{alias}"."{right_field}""#,
join = join_word,
table = through.table,
alias = &through_alias,
left = &left_alias,
left_field = through.left_field,
right_field = through.right_field,
);
qb.push(clause);
left_alias = through_alias;
}
alias_prefix.push_str(segment.descriptor.alias_segment);
let right_alias = alias_prefix.clone();
let clause = format!(
r#"{join}{table} AS "{alias}" ON "{left}"."{left_field}" = "{alias}"."{right_field}""#,
join = join_word,
table = segment.descriptor.right_table,
alias = &right_alias,
left = &left_alias,
left_field = segment.descriptor.left_field,
right_field = segment.descriptor.right_field,
);
qb.push(clause);
left_alias = right_alias;
}
}
impl<'a, C> InsertQueryPlan<'a, C, C::Model>
where
C: QueryContext,
C::Model: Creatable + JoinNavigationModel,
{
fn auto_join_paths(&self) -> Vec<sqlxo_traits::JoinPath> {
if !self.auto_joins {
return Vec::new();
}
C::Model::default_join_paths(self.include_lazy_relations).into_vec()
}
fn to_graph_query_builder(
&self,
joins: &[sqlxo_traits::JoinPath],
) -> sqlx::QueryBuilder<'static, Postgres> {
let mut qb = sqlx::QueryBuilder::<Postgres>::new(
"WITH affected AS (INSERT INTO ",
);
qb.push(self.table);
self.create_model
.apply_inserts(&mut qb, self.insert_marker_field);
qb.push(" RETURNING *)");
self.create_model.append_relation_ctes(&mut qb, "affected");
qb.push(" SELECT \"affected\".*");
self.create_model
.append_relation_dependency_columns(&mut qb);
for col in C::Model::collect_join_columns(Some(joins), "") {
qb.push(", ");
qb.push(format!(
r#""{}"."{}" AS "{}""#,
col.table_alias, col.column, col.alias
));
}
qb.push(" FROM affected");
for path in joins {
push_join_path_inline(&mut qb, path, "affected");
}
qb
}
fn hydrate_graph_rows(
&self,
rows: Vec<sqlx::postgres::PgRow>,
joins: &[sqlxo_traits::JoinPath],
) -> Result<Vec<C::Model>> {
let joins_ref = if joins.is_empty() { None } else { Some(joins) };
let mut models = Vec::with_capacity(rows.len());
for row in rows {
let mut model = C::Model::from_row(&row)?;
model.hydrate_navigations(joins_ref, &row, "")?;
models.push(model);
}
if C::Model::has_collection_joins(joins_ref) {
return Ok(C::Model::merge_collection_rows(models, joins_ref));
}
Ok(models)
}
}
#[async_trait::async_trait]
impl<C, Row> InsertFetchRow<C> for Row
where
C: QueryContext,
C::Model: Creatable,
Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
default async fn fetch_one<'a, 'e, E>(
plan: &InsertQueryPlan<'a, C, Self>,
exec: E,
) -> Result<Self>
where
E: Executor<'e, Database = Postgres>,
{
let mut qb = plan.to_query_builder();
plan.push_returning(&mut qb);
Ok(qb.build_query_as::<Self>().fetch_one(exec).await?)
}
default async fn fetch_all<'a, 'e, E>(
plan: &InsertQueryPlan<'a, C, Self>,
exec: E,
) -> Result<Vec<Self>>
where
E: Executor<'e, Database = Postgres>,
{
let mut qb = plan.to_query_builder();
plan.push_returning(&mut qb);
Ok(qb.build_query_as::<Self>().fetch_all(exec).await?)
}
default async fn fetch_optional<'a, 'e, E>(
plan: &InsertQueryPlan<'a, C, Self>,
exec: E,
) -> Result<Option<Self>>
where
E: Executor<'e, Database = Postgres>,
{
let mut qb = plan.to_query_builder();
plan.push_returning(&mut qb);
Ok(qb.build_query_as::<Self>().fetch_optional(exec).await?)
}
}
#[async_trait::async_trait]
impl<C> InsertFetchRow<C> for C::Model
where
C: QueryContext,
C::Model: Creatable + JoinNavigationModel + Clone,
{
async fn fetch_one<'a, 'e, E>(
plan: &InsertQueryPlan<'a, C, Self>,
exec: E,
) -> Result<Self>
where
E: Executor<'e, Database = Postgres>,
{
if plan.selection.is_some() {
let mut qb = plan.to_query_builder();
plan.push_returning(&mut qb);
return Ok(qb.build_query_as::<Self>().fetch_one(exec).await?);
}
let joins = plan.auto_join_paths();
let rows = plan
.to_graph_query_builder(&joins)
.build()
.fetch_all(exec)
.await?;
let mut models = plan.hydrate_graph_rows(rows, &joins)?;
if let Some(first) = models.first_mut() {
plan.create_model.apply_relation_payload(first);
}
models
.into_iter()
.next()
.ok_or_else(|| sqlx::Error::RowNotFound.into())
}
async fn fetch_all<'a, 'e, E>(
plan: &InsertQueryPlan<'a, C, Self>,
exec: E,
) -> Result<Vec<Self>>
where
E: Executor<'e, Database = Postgres>,
{
if plan.selection.is_some() {
let mut qb = plan.to_query_builder();
plan.push_returning(&mut qb);
return Ok(qb.build_query_as::<Self>().fetch_all(exec).await?);
}
let joins = plan.auto_join_paths();
let rows = plan
.to_graph_query_builder(&joins)
.build()
.fetch_all(exec)
.await?;
let mut models = plan.hydrate_graph_rows(rows, &joins)?;
for model in models.iter_mut() {
plan.create_model.apply_relation_payload(model);
}
Ok(models)
}
async fn fetch_optional<'a, 'e, E>(
plan: &InsertQueryPlan<'a, C, Self>,
exec: E,
) -> Result<Option<Self>>
where
E: Executor<'e, Database = Postgres>,
{
if plan.selection.is_some() {
let mut qb = plan.to_query_builder();
plan.push_returning(&mut qb);
return Ok(qb
.build_query_as::<Self>()
.fetch_optional(exec)
.await?);
}
let joins = plan.auto_join_paths();
let rows = plan
.to_graph_query_builder(&joins)
.build()
.fetch_all(exec)
.await?;
let mut models = plan.hydrate_graph_rows(rows, &joins)?;
if let Some(first) = models.first_mut() {
plan.create_model.apply_relation_payload(first);
}
Ok(models.into_iter().next())
}
}
impl<'a, C, Row> Planable<C, Row> for InsertQueryPlan<'a, C, Row>
where
C: QueryContext,
C::Model: Creatable,
Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
}
pub struct InsertQueryBuilder<
'a,
C: QueryContext,
Row = <C as QueryContext>::Model,
> where
C::Model: Creatable,
{
pub(crate) table: &'a str,
pub(crate) create_model: Option<<C::Model as Creatable>::CreateModel>,
pub(crate) insert_marker_field: Option<&'static str>,
pub(crate) auto_joins: bool,
pub(crate) include_lazy_relations: bool,
pub(crate) selection: Option<SelectionList<Row, select::SelectionColumn>>,
row: PhantomData<Row>,
}
impl<'a, C, Row> InsertQueryBuilder<'a, C, Row>
where
C: QueryContext,
C::Model: Creatable,
{
pub fn model(
mut self,
model: <C::Model as Creatable>::CreateModel,
) -> Self {
self.create_model = Some(model);
self
}
pub fn without_auto_joins(mut self) -> Self {
self.auto_joins = false;
self
}
pub fn include_lazy_relations(mut self) -> Self {
self.include_lazy_relations = true;
self
}
}
impl<'a, C, Row> Buildable<C> for InsertQueryBuilder<'a, C, Row>
where
C: QueryContext,
C::Model: Creatable,
Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
type Row = Row;
type Plan = InsertQueryPlan<'a, C, Row>;
fn from_ctx() -> Self {
Self {
table: C::TABLE,
create_model: None,
insert_marker_field:
<C::Model as Creatable>::INSERT_MARKER_FIELD,
auto_joins: true,
include_lazy_relations: false,
selection: None,
row: PhantomData,
}
}
fn build(self) -> Self::Plan {
let create_model = self
.create_model
.expect("create model must be set with .model()");
InsertQueryPlan {
table: self.table,
create_model,
insert_marker_field: self.insert_marker_field,
auto_joins: self.auto_joins,
include_lazy_relations: self.include_lazy_relations,
selection: self.selection,
row: PhantomData,
}
}
}
impl<'a, C, Row> InsertQueryBuilder<'a, C, Row>
where
C: QueryContext,
C::Model: Creatable,
Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
pub fn take<NewRow>(
self,
selection: SelectionList<NewRow, select::SelectionEntry>,
) -> InsertQueryBuilder<'a, C, NewRow>
where
NewRow: Send
+ Sync
+ Unpin
+ for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
InsertQueryBuilder {
table: self.table,
create_model: self.create_model,
insert_marker_field: self.insert_marker_field,
auto_joins: self.auto_joins,
include_lazy_relations: self.include_lazy_relations,
selection: Some(selection.expect_columns()),
row: PhantomData,
}
}
}
impl<'a, C, Row> BuildableInsertQuery<C, Row> for InsertQueryBuilder<'a, C, Row>
where
C: QueryContext,
C::Model: Creatable,
Row: Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
}