use darling::ToTokens;
use proc_macro2::TokenStream;
use quote::{TokenStreamExt, quote};
use super::options::*;
pub struct DeleteFn<'a> {
modify_error: syn::Ident,
entity: &'a syn::Ident,
table_name: &'a str,
columns: &'a Columns,
delete_option: &'a DeleteOption,
nested_delete_fn_names: Vec<syn::Ident>,
post_persist_error: Option<&'a syn::Type>,
#[cfg(feature = "instrument")]
repo_name_snake: String,
}
impl<'a> DeleteFn<'a> {
pub fn from(opts: &'a RepositoryOptions) -> Self {
Self {
entity: opts.entity(),
modify_error: opts.modify_error(),
columns: &opts.columns,
table_name: opts.table_name(),
delete_option: &opts.delete,
nested_delete_fn_names: opts
.all_nested()
.map(|f| f.delete_nested_fn_name())
.collect(),
post_persist_error: opts.post_persist_hook.as_ref().map(|h| &h.error),
#[cfg(feature = "instrument")]
repo_name_snake: opts.repo_name_snake_case(),
}
}
}
impl ToTokens for DeleteFn<'_> {
fn to_tokens(&self, tokens: &mut TokenStream) {
if !self.delete_option.is_soft() {
return;
}
let entity = self.entity;
let modify_error = &self.modify_error;
let nested_deletes = self.nested_delete_fn_names.iter().map(|f| {
quote! {
Self::#f::<_, _, #modify_error>(op, &entity).await?;
}
});
let assignments = self
.columns
.variable_assignments_for_update(syn::parse_quote! { entity });
let column_updates = self.columns.sql_updates();
let query = format!(
"UPDATE {} SET {}{}deleted = TRUE WHERE id = $1",
self.table_name,
column_updates,
if column_updates.is_empty() { "" } else { ", " }
);
let args = self.columns.update_query_args();
#[cfg(feature = "instrument")]
let (instrument_attr, record_id, error_recording) = {
let entity_name = entity.to_string();
let repo_name = &self.repo_name_snake;
let span_name = format!("{}.delete", repo_name);
(
quote! {
#[tracing::instrument(name = #span_name, skip_all, fields(entity = #entity_name, id = tracing::field::Empty, error = tracing::field::Empty, exception.message = tracing::field::Empty, exception.type = tracing::field::Empty))]
},
quote! {
tracing::Span::current().record("id", tracing::field::debug(&entity.id));
},
quote! {
if let Err(ref e) = __result {
tracing::Span::current().record("error", true);
tracing::Span::current().record("exception.message", tracing::field::display(e));
tracing::Span::current().record("exception.type", std::any::type_name_of_val(e));
}
},
)
};
#[cfg(not(feature = "instrument"))]
let (instrument_attr, record_id, error_recording) = (quote! {}, quote! {}, quote! {});
let post_persist_check = if self.post_persist_error.is_some() {
quote! {
self.execute_post_persist_hook(op, &entity, entity.events().last_persisted(n_events)).await.map_err(#modify_error::PostPersistHookError)?;
}
} else {
quote! {}
};
tokens.append_all(quote! {
pub async fn delete(
&self,
entity: #entity
) -> Result<(), #modify_error> {
let mut op = self.begin_op().await?;
let res = self.delete_in_op(&mut op, entity).await?;
op.commit().await?;
Ok(res)
}
#instrument_attr
pub async fn delete_in_op<OP>(&self,
op: &mut OP,
mut entity: #entity
) -> Result<(), #modify_error>
where
OP: es_entity::AtomicOperation
{
let __result: Result<(), #modify_error> = async {
#(#nested_deletes)*
#assignments
#record_id
sqlx::query!(
#query,
#(#args),*
)
.execute(op.as_executor())
.await
.map_err(|e| match &e {
sqlx::Error::Database(db_err) if db_err.is_unique_violation() => {
#modify_error::ConstraintViolation {
column: Self::map_constraint_column(db_err.constraint()),
value: es_entity::extract_constraint_value(db_err.as_ref()),
inner: e,
}
}
_ => #modify_error::Sqlx(e),
})?;
let new_events = {
let events = Self::extract_events(&mut entity);
events.any_new()
};
if new_events {
let n_events = {
let events = Self::extract_events(&mut entity);
Self::extract_concurrent_modification(
self.persist_events(op, events).await,
#modify_error::ConcurrentModification,
)?
};
#post_persist_check
}
Ok(())
}.await;
#error_recording
__result
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use proc_macro2::Span;
use syn::Ident;
#[test]
fn delete_fn() {
let id = Ident::new("EntityId", Span::call_site());
let entity = Ident::new("Entity", Span::call_site());
let mut columns = Columns::default();
columns.set_id_column(&id);
let delete_fn = DeleteFn {
entity: &entity,
modify_error: syn::Ident::new("EntityModifyError", Span::call_site()),
table_name: "entities",
columns: &columns,
delete_option: &DeleteOption::Soft,
nested_delete_fn_names: Vec::new(),
post_persist_error: None,
#[cfg(feature = "instrument")]
repo_name_snake: "test_repo".to_string(),
};
let mut tokens = TokenStream::new();
delete_fn.to_tokens(&mut tokens);
let expected = quote! {
pub async fn delete(
&self,
entity: Entity
) -> Result<(), EntityModifyError> {
let mut op = self.begin_op().await?;
let res = self.delete_in_op(&mut op, entity).await?;
op.commit().await?;
Ok(res)
}
pub async fn delete_in_op<OP>(
&self,
op: &mut OP,
mut entity: Entity
) -> Result<(), EntityModifyError>
where
OP: es_entity::AtomicOperation
{
let __result: Result<(), EntityModifyError> = async {
let id = &entity.id;
sqlx::query!(
"UPDATE entities SET deleted = TRUE WHERE id = $1",
id as &EntityId
)
.execute(op.as_executor())
.await
.map_err(|e| match &e {
sqlx::Error::Database(db_err) if db_err.is_unique_violation() => {
EntityModifyError::ConstraintViolation {
column: Self::map_constraint_column(db_err.constraint()),
value: es_entity::extract_constraint_value(db_err.as_ref()),
inner: e,
}
}
_ => EntityModifyError::Sqlx(e),
})?;
let new_events = {
let events = Self::extract_events(&mut entity);
events.any_new()
};
if new_events {
let n_events = {
let events = Self::extract_events(&mut entity);
Self::extract_concurrent_modification(
self.persist_events(op, events).await,
EntityModifyError::ConcurrentModification,
)?
};
}
Ok(())
}.await;
__result
}
};
assert_eq!(tokens.to_string(), expected.to_string());
}
#[test]
fn delete_fn_with_update_columns() {
let id = syn::parse_str("EntityId").unwrap();
let entity = Ident::new("Entity", Span::call_site());
let columns = Columns::new(
&id,
[Column::new(
Ident::new("name", Span::call_site()),
syn::parse_str("String").unwrap(),
)],
);
let delete_fn = DeleteFn {
entity: &entity,
modify_error: syn::Ident::new("EntityModifyError", Span::call_site()),
table_name: "entities",
columns: &columns,
delete_option: &DeleteOption::Soft,
nested_delete_fn_names: Vec::new(),
post_persist_error: None,
#[cfg(feature = "instrument")]
repo_name_snake: "test_repo".to_string(),
};
let mut tokens = TokenStream::new();
delete_fn.to_tokens(&mut tokens);
let expected = quote! {
pub async fn delete(
&self,
entity: Entity
) -> Result<(), EntityModifyError> {
let mut op = self.begin_op().await?;
let res = self.delete_in_op(&mut op, entity).await?;
op.commit().await?;
Ok(res)
}
pub async fn delete_in_op<OP>(
&self,
op: &mut OP,
mut entity: Entity
) -> Result<(), EntityModifyError>
where
OP: es_entity::AtomicOperation
{
let __result: Result<(), EntityModifyError> = async {
let id = &entity.id;
let name = &entity.name;
sqlx::query!(
"UPDATE entities SET name = $2, deleted = TRUE WHERE id = $1",
id as &EntityId,
name as &String
)
.execute(op.as_executor())
.await
.map_err(|e| match &e {
sqlx::Error::Database(db_err) if db_err.is_unique_violation() => {
EntityModifyError::ConstraintViolation {
column: Self::map_constraint_column(db_err.constraint()),
value: es_entity::extract_constraint_value(db_err.as_ref()),
inner: e,
}
}
_ => EntityModifyError::Sqlx(e),
})?;
let new_events = {
let events = Self::extract_events(&mut entity);
events.any_new()
};
if new_events {
let n_events = {
let events = Self::extract_events(&mut entity);
Self::extract_concurrent_modification(
self.persist_events(op, events).await,
EntityModifyError::ConcurrentModification,
)?
};
}
Ok(())
}.await;
__result
}
};
assert_eq!(tokens.to_string(), expected.to_string());
}
}