use proc_macro2::TokenStream;
use quote::{format_ident, quote};
use syn::parse::Parser as _;
use syn::{Ident, ItemTrait, LitStr, TraitItem};
fn parse_version_history_sensitive(attrs: &[syn::Attribute]) -> syn::Result<Vec<String>> {
let mut cols: Vec<String> = Vec::new();
for attr in attrs {
if attr.path().is_ident("version_history") {
attr.parse_nested_meta(|meta| {
if meta.path.is_ident("sensitive") {
let value = meta.value()?;
let arr: syn::ExprArray = value.parse()?;
for elem in arr.elems {
match elem {
syn::Expr::Lit(syn::ExprLit {
lit: syn::Lit::Str(s),
..
}) => cols.push(s.value()),
other => {
return Err(syn::Error::new_spanned(
other,
"sensitive column names must be string literals (e.g. `sensitive = [\"my_col\"]`)",
));
}
}
}
Ok(())
} else {
Err(meta.error(
"unknown version_history key; expected `sensitive = [\"col\", ...]`",
))
}
})?;
}
}
Ok(cols)
}
use crate::model::infer_table_name;
fn to_snake_case(name: &str) -> String {
let mut result = String::new();
for (i, ch) in name.chars().enumerate() {
if ch.is_uppercase() {
if i > 0 {
result.push('_');
}
result.push(ch.to_ascii_lowercase());
} else {
result.push(ch);
}
}
result
}
#[allow(clippy::struct_excessive_bools)]
struct RepoConfig {
model_name: Ident,
table_name: String,
hooks_type: Option<Ident>,
commit_hooks: bool,
api_path: Option<String>,
policy_type: Option<Ident>,
scope_type: Option<Ident>,
cursor_key: Option<String>,
cursor_key_type: Option<syn::Path>,
soft_delete: bool,
tenant_scoped: bool,
no_upsert_trait: bool,
searchable: bool,
versioned: bool,
no_versioned_record_impl: bool,
primary_reads: bool,
}
#[allow(clippy::too_many_lines)]
fn parse_repo_args(attr: TokenStream) -> syn::Result<RepoConfig> {
let mut model_name: Option<Ident> = None;
let mut table_name: Option<String> = None;
let mut hooks_type: Option<Ident> = None;
let mut commit_hooks = false;
let mut api_path: Option<String> = None;
let mut policy_type: Option<Ident> = None;
let mut scope_type: Option<Ident> = None;
let mut cursor_key: Option<String> = None;
let mut cursor_key_type: Option<syn::Path> = None;
let mut soft_delete = false;
let mut tenant_scoped = false;
let mut no_upsert_trait = false;
let mut searchable = false;
let mut versioned = false;
let mut no_versioned_record_impl = false;
let mut primary_reads = false;
syn::meta::parser(|meta| {
if meta.path.is_ident("hooks") {
let value: Ident = meta.value()?.parse()?;
hooks_type = Some(value);
Ok(())
} else if meta.path.is_ident("commit_hooks") {
let value: syn::LitBool = meta.value()?.parse()?;
commit_hooks = value.value;
Ok(())
} else if meta.path.is_ident("table") {
let value: LitStr = meta.value()?.parse()?;
table_name = Some(value.value());
Ok(())
} else if meta.path.is_ident("api") {
let value: LitStr = meta.value()?.parse()?;
api_path = Some(value.value());
Ok(())
} else if meta.path.is_ident("policy") {
let value: Ident = meta.value()?.parse()?;
policy_type = Some(value);
Ok(())
} else if meta.path.is_ident("scope") {
let value: Ident = meta.value()?.parse()?;
scope_type = Some(value);
Ok(())
} else if meta.path.is_ident("cursor_key") {
let value: Ident = meta.value()?.parse()?;
cursor_key = Some(value.to_string());
Ok(())
} else if meta.path.is_ident("cursor_key_type") {
let value: syn::Path = meta.value()?.parse()?;
cursor_key_type = Some(value);
Ok(())
} else if meta.path.is_ident("soft_delete") {
soft_delete = true;
Ok(())
} else if meta.path.is_ident("tenant_scoped") {
tenant_scoped = true;
Ok(())
} else if meta.path.is_ident("no_upsert_trait") {
no_upsert_trait = true;
Ok(())
} else if meta.path.is_ident("searchable") {
searchable = true;
Ok(())
} else if meta.path.is_ident("versioned") {
let value: syn::LitBool = meta.value()?.parse()?;
versioned = value.value;
Ok(())
} else if meta.path.is_ident("no_versioned_record_impl") {
no_versioned_record_impl = true;
Ok(())
} else if meta.path.is_ident("primary_reads") {
primary_reads = true;
Ok(())
} else if meta.path.get_ident().is_some() && model_name.is_none() {
model_name = Some(meta.path.get_ident().unwrap().clone());
Ok(())
} else {
Err(meta.error(
"expected model name, table = \"...\", hooks = Type, commit_hooks = true, api = \"/path\", policy = Type, scope = Type, cursor_key = field, cursor_key_type = Type, soft_delete, tenant_scoped, no_upsert_trait, searchable, versioned = true, no_versioned_record_impl, or primary_reads",
))
}
})
.parse2(attr)?;
let model = model_name.ok_or_else(|| {
syn::Error::new(
proc_macro2::Span::call_site(),
"expected model name: #[repository(ModelName)]",
)
})?;
if commit_hooks && hooks_type.is_none() {
return Err(syn::Error::new(
proc_macro2::Span::call_site(),
"commit_hooks = true requires hooks = Type",
));
}
let table = table_name.unwrap_or_else(|| infer_table_name(&model));
Ok(RepoConfig {
model_name: model,
table_name: table,
hooks_type,
commit_hooks,
api_path,
policy_type,
scope_type,
cursor_key,
cursor_key_type,
soft_delete,
tenant_scoped,
no_upsert_trait,
searchable,
versioned,
no_versioned_record_impl,
primary_reads,
})
}
struct DerivedQuery {
prefix: String, fields: Vec<String>, #[allow(dead_code)] combinator: String, }
fn parse_query_name(name: &str) -> Option<DerivedQuery> {
let prefixes = ["find", "count", "delete", "exists"];
let prefix = prefixes.iter().find(|p| name.starts_with(*p))?;
let rest = name.strip_prefix(prefix)?;
let rest = rest.strip_prefix("_by_")?;
let (fields, combinator) = if rest.contains("_and_") {
if rest.contains("_or_") {
return None; }
let parts: Vec<String> = rest.split("_and_").map(String::from).collect();
(parts, "and".to_string())
} else if rest.contains("_or_") {
let parts: Vec<String> = rest.split("_or_").map(String::from).collect();
(parts, "or".to_string())
} else {
(vec![rest.to_string()], "and".to_string())
};
Some(DerivedQuery {
prefix: (*prefix).to_string(),
fields,
combinator,
})
}
#[allow(clippy::too_many_lines)]
fn generate_derived_query_for_source(
query: &DerivedQuery,
table_ident: &Ident,
model_name: &Ident,
soft_delete: bool,
query_source: &TokenStream,
string_fields: &std::collections::HashSet<String>,
) -> TokenStream {
let field_idents: Vec<Ident> = query.fields.iter().map(|f| format_ident!("{f}")).collect();
let param_names: Vec<Ident> = query.fields.iter().map(|f| format_ident!("{f}")).collect();
let table_name_str = table_ident.to_string();
let mut encode_lets: Vec<TokenStream> = Vec::new();
let filters: Vec<TokenStream> = field_idents
.iter()
.zip(param_names.iter())
.map(|(field, param)| {
if string_fields.contains(&field.to_string()) {
let field_str = field.to_string();
let enc_ident = format_ident!("__autumn_q_{field}");
encode_lets.push(quote! {
let #enc_ident = ::autumn_web::encryption::encode_derived_query_param(
#table_name_str, #field_str, &#param,
)
.map_err(|__e| ::autumn_web::AutumnError::internal_server_error_msg(
__e.to_string(),
))?;
});
quote! { .filter(#table_ident::#field.eq(&#enc_ident)) }
} else {
quote! { .filter(#table_ident::#field.eq(&#param)) }
}
})
.collect();
let soft_delete_filter = if soft_delete {
quote! { .filter(#table_ident::deleted_at.is_null()) }
} else {
quote! {}
};
match query.prefix.as_str() {
"find" => {
quote! {
#(#encode_lets)*
let mut conn = self.__autumn_acquire_read_conn().await?;
#query_source
#(#filters)*
#soft_delete_filter
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
}
"count" => {
quote! {
#(#encode_lets)*
let mut conn = self.__autumn_acquire_read_conn().await?;
#query_source
#(#filters)*
#soft_delete_filter
.count()
.get_result::<i64>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
}
"delete" => {
if soft_delete {
quote! {
#(#encode_lets)*
let __now = ::autumn_web::reexports::chrono::Utc::now().naive_utc();
let mut conn = self.__autumn_acquire_conn().await?;
::autumn_web::reexports::diesel::update(
#query_source #(#filters)* .filter(#table_ident::deleted_at.is_null())
)
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
Ok(())
}
} else {
quote! {
#(#encode_lets)*
let mut conn = self.__autumn_acquire_conn().await?;
::autumn_web::reexports::diesel::delete(#query_source #(#filters)*)
.execute(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
Ok(())
}
}
}
"exists" => {
quote! {
#(#encode_lets)*
let mut conn = self.__autumn_acquire_read_conn().await?;
::autumn_web::reexports::diesel::select(
::autumn_web::reexports::diesel::dsl::exists(
#query_source #(#filters)* #soft_delete_filter
)
)
.get_result::<bool>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
}
_ => {
let msg = format!(
"Unsupported query prefix: {}. Supported prefixes are find, count, delete, exists.",
query.prefix
);
quote! { ::core::compile_error!(#msg); }
}
}
}
fn generate_derived_query(
query: &DerivedQuery,
table_ident: &Ident,
model_name: &Ident,
soft_delete: bool,
tenant_scoped: bool,
string_fields: &std::collections::HashSet<String>,
) -> TokenStream {
if tenant_scoped {
let across_tenants_query = generate_derived_query_for_source(
query,
table_ident,
model_name,
soft_delete,
"e! { #table_ident::table },
string_fields,
);
let scoped_query = generate_derived_query_for_source(
query,
table_ident,
model_name,
soft_delete,
"e! { #table_ident::table.filter(#table_ident::tenant_id.eq(tenant_id)) },
string_fields,
);
quote! {
if self.across_tenants {
#across_tenants_query
} else {
let tenant_id = match ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten() {
::core::option::Option::Some(t) => t,
::core::option::Option::None => {
return ::core::result::Result::Err(::autumn_web::AutumnError::internal_server_error_msg(
"no tenant context was established"
));
}
};
#scoped_query
}
}
} else {
generate_derived_query_for_source(
query,
table_ident,
model_name,
soft_delete,
"e! { #table_ident::table },
string_fields,
)
}
}
fn is_string_param_type(ty: &syn::Type) -> bool {
match ty {
syn::Type::Reference(r) => is_string_param_type(&r.elem),
syn::Type::Path(p) => p
.path
.segments
.last()
.is_some_and(|s| s.ident == "String" || s.ident == "str"),
_ => false,
}
}
fn vh_insert_ts(
table_name_str: &str,
op: &str,
with_ctx: bool,
record_expr: &TokenStream,
before_expr: Option<&TokenStream>,
conn_ident: &TokenStream,
model_ident: &proc_macro2::Ident,
) -> TokenStream {
let actor_ts = if with_ctx {
quote! { ctx.actor.as_deref().unwrap_or("system") }
} else {
quote! { "system" }
};
let request_id_ts = if with_ctx {
quote! { ctx.request_id.as_deref() }
} else {
quote! { ::core::option::Option::None::<&str> }
};
let changes_ts = match op {
"insert" => quote! {
{
use ::autumn_web::version_history::VersionedRecord as _;
let __vh_json = (#record_expr).version_column_values();
let __vh_changes = ::autumn_web::version_history::compute_insert_changes(&__vh_json, <#model_ident as ::autumn_web::version_history::VersionedRecord>::version_sensitive_columns());
::autumn_web::reexports::serde_json::to_string(&__vh_changes)
.unwrap_or_else(|_| "[]".to_string())
}
},
"delete" => quote! {
{
use ::autumn_web::version_history::VersionedRecord as _;
let __vh_json = (#record_expr).version_column_values();
let __vh_changes = ::autumn_web::version_history::compute_delete_changes(&__vh_json, <#model_ident as ::autumn_web::version_history::VersionedRecord>::version_sensitive_columns());
::autumn_web::reexports::serde_json::to_string(&__vh_changes)
.unwrap_or_else(|_| "[]".to_string())
}
},
_ => {
let before = before_expr.unwrap_or(record_expr);
quote! {
{
use ::autumn_web::version_history::VersionedRecord as _;
let __vh_before_json = (#before).version_column_values();
let __vh_after_json = (#record_expr).version_column_values();
let __vh_changes = ::autumn_web::version_history::compute_diff(&__vh_before_json, &__vh_after_json, <#model_ident as ::autumn_web::version_history::VersionedRecord>::version_sensitive_columns());
::autumn_web::reexports::serde_json::to_string(&__vh_changes)
.unwrap_or_else(|_| "[]".to_string())
}
}
}
};
let table_name_ts = table_name_str.to_string();
quote! {
{
let __vh_changes_str: ::std::string::String = #changes_ts;
let __vh_record_id: i64 = {
use ::autumn_web::version_history::VersionedRecord as _;
(#record_expr).version_record_id()
};
let __vh_tenant_id: ::core::option::Option<&str> = {
use ::autumn_web::version_history::VersionedRecord as _;
(#record_expr).version_tenant_id()
};
let __vh_actor: &str = #actor_ts;
let __vh_request_id: ::core::option::Option<&str> = #request_id_ts;
::autumn_web::reexports::diesel::sql_query(
"INSERT INTO _autumn_version_history \
(table_name, tenant_id, record_id, op, actor, request_id, changes) \
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)"
)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(#table_name_ts)
.bind::<::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Text>, _>(__vh_tenant_id)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(__vh_record_id)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(#op)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(__vh_actor)
.bind::<::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Text>, _>(__vh_request_id)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(__vh_changes_str)
.execute(#conn_ident)
.await
.map_err(::autumn_web::AutumnError::from)?;
}
}
}
#[allow(
clippy::too_many_lines,
clippy::option_if_let_else,
clippy::large_stack_frames
)]
#[allow(clippy::cognitive_complexity)]
pub fn repository_macro(attr: TokenStream, item: TokenStream) -> TokenStream {
let config = match parse_repo_args(attr) {
Ok(c) => c,
Err(err) => return err.to_compile_error(),
};
let trait_def: ItemTrait = match syn::parse2(item) {
Ok(t) => t,
Err(err) => return err.to_compile_error(),
};
let model_name = &config.model_name;
let table_name = &config.table_name;
let table_ident = format_ident!("{table_name}");
let trait_name = &trait_def.ident;
let pg_name = format_ident!("Pg{trait_name}");
let new_name = format_ident!("New{model_name}");
let update_name = format_ident!("Update{model_name}");
let vis = &trait_def.vis;
let commit_hooks_enabled = config.hooks_type.is_some() && config.commit_hooks;
let tenant_extra = usize::from(config.tenant_scoped);
let sd_filter = if config.soft_delete {
quote! { .filter(#table_ident::deleted_at.is_null()) }
} else {
quote! {}
};
let mut derived_trait_methods = Vec::new();
let mut derived_impl_methods = Vec::new();
for item in &trait_def.items {
if let TraitItem::Fn(method) = item {
let method_name = method.sig.ident.to_string();
if let Some(query) = parse_query_name(&method_name) {
let fn_ident = &method.sig.ident;
let user_params: Vec<TokenStream> = method
.sig
.inputs
.iter()
.filter_map(|arg| {
if let syn::FnArg::Typed(pat_type) = arg {
let pat = &pat_type.pat;
let ty = &pat_type.ty;
Some(quote! { #pat: #ty })
} else {
None }
})
.collect();
let string_fields: std::collections::HashSet<String> = method
.sig
.inputs
.iter()
.filter_map(|arg| {
let syn::FnArg::Typed(pat_type) = arg else {
return None;
};
let syn::Pat::Ident(pat_ident) = pat_type.pat.as_ref() else {
return None;
};
is_string_param_type(&pat_type.ty).then(|| pat_ident.ident.to_string())
})
.collect();
let return_type = match query.prefix.as_str() {
"find" => quote! { Vec<#model_name> },
"count" => quote! { i64 },
"exists" => quote! { bool },
_ => quote! { () }, };
let params = &user_params;
let body = generate_derived_query(
&query,
&table_ident,
model_name,
config.soft_delete,
config.tenant_scoped,
&string_fields,
);
derived_trait_methods.push(quote! {
fn #fn_ident(&self, #(#params),*) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<#return_type>> + Send;
});
derived_impl_methods.push(quote! {
async fn #fn_ident(&self, #(#params),*) -> ::autumn_web::AutumnResult<#return_type> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
#body
}
});
}
}
}
let tenant_struct_field = if config.tenant_scoped {
quote! { across_tenants: bool, }
} else {
quote! {}
};
let tenant_clone_field = if config.tenant_scoped {
quote! { across_tenants: self.across_tenants, }
} else {
quote! {}
};
let tenant_init_field = if config.tenant_scoped {
quote! { across_tenants: false, }
} else {
quote! {}
};
let across_tenants_method = if config.tenant_scoped {
if let Some(hooks_ident) = &config.hooks_type {
let idempotency_clone_field = if commit_hooks_enabled {
quote! {
idempotency: self.idempotency.clone(),
}
} else {
quote! {}
};
quote! {
pub fn across_tenants(&self) -> Self {
if ::core::cfg!(debug_assertions) {
::autumn_web::reexports::tracing::warn!("across_tenants() called on tenant_scoped repository");
}
Self {
pool: self.pool.clone(),
hooks: <#hooks_ident as ::autumn_web::hooks::RepositoryHooksClone>::autumn_clone(&self.hooks),
#idempotency_clone_field
across_tenants: true,
__autumn_read_route: self.__autumn_read_route.clone(),
__autumn_statement_timeout_ms: self.__autumn_statement_timeout_ms,
__autumn_slow_threshold: self.__autumn_slow_threshold,
__autumn_route: self.__autumn_route.clone(),
}
}
}
} else {
quote! {
pub fn across_tenants(&self) -> Self {
if ::core::cfg!(debug_assertions) {
::autumn_web::reexports::tracing::warn!("across_tenants() called on tenant_scoped repository");
}
Self {
pool: self.pool.clone(),
across_tenants: true,
__autumn_read_route: self.__autumn_read_route.clone(),
__autumn_statement_timeout_ms: self.__autumn_statement_timeout_ms,
__autumn_slow_threshold: self.__autumn_slow_threshold,
__autumn_route: self.__autumn_route.clone(),
}
}
}
}
} else {
quote! {}
};
let read_route_init = if config.primary_reads {
quote! {
let __autumn_read_route = ::autumn_web::repository::ReadRoute::Primary;
}
} else {
quote! {
let __autumn_read_route =
::autumn_web::repository::ReadRoute::from_state(state);
}
};
let (
struct_fields,
clone_impl,
extractor_init,
save_body,
update_body,
delete_body,
hook_support_methods,
hook_inventory_registration,
save_many_body,
save_many_skip_invalid_body,
update_many_body,
delete_many_body,
upsert_many_body,
) = if let Some(ref hooks_ident) = config.hooks_type {
let idempotency_struct_field = if commit_hooks_enabled {
quote! {
idempotency: ::core::option::Option<::autumn_web::idempotency::IdempotencyContext>,
}
} else {
quote! {}
};
let idempotency_clone_field = if commit_hooks_enabled {
quote! {
idempotency: self.idempotency.clone(),
}
} else {
quote! {}
};
let struct_fields = quote! {
pool: ::autumn_web::reexports::diesel_async::pooled_connection::deadpool::Pool<
::autumn_web::reexports::diesel_async::AsyncPgConnection,
>,
hooks: #hooks_ident,
#idempotency_struct_field
#tenant_struct_field
__autumn_read_route: ::autumn_web::repository::ReadRoute,
__autumn_statement_timeout_ms: u64,
__autumn_slow_threshold: ::std::time::Duration,
__autumn_route: ::std::option::Option<::std::string::String>,
};
let clone_impl = quote! {
impl ::core::clone::Clone for #pg_name {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
hooks: <#hooks_ident as ::autumn_web::hooks::RepositoryHooksClone>::autumn_clone(&self.hooks),
#idempotency_clone_field
#tenant_clone_field
__autumn_read_route: self.__autumn_read_route.clone(),
__autumn_statement_timeout_ms: self.__autumn_statement_timeout_ms,
__autumn_slow_threshold: self.__autumn_slow_threshold,
__autumn_route: self.__autumn_route.clone(),
}
}
}
};
let timeout_route_init = quote! {
use ::autumn_web::db::DbState as _;
const __AUTUMN_PG_TIMEOUT_MAX_MS: u64 = i32::MAX as u64;
let __autumn_timeout_ms: u64 = _parts
.extensions
.get::<::autumn_web::db::StatementTimeout>()
.map(|t| ::std::convert::TryFrom::try_from(t.0.as_millis()).unwrap_or(u64::MAX))
.or_else(|| state.statement_timeout().map(|d| ::std::convert::TryFrom::try_from(d.as_millis()).unwrap_or(u64::MAX)))
.unwrap_or(0u64)
.min(__AUTUMN_PG_TIMEOUT_MAX_MS);
let __autumn_slow_threshold = state.slow_query_threshold();
let __autumn_route: ::std::option::Option<::std::string::String> = _parts
.extensions
.get::<::autumn_web::reexports::axum::extract::MatchedPath>()
.map(|p| p.as_str().to_owned());
#read_route_init
};
let extractor_init = if commit_hooks_enabled {
quote! {
#pg_name::__autumn_register_repository_commit_hooks();
#timeout_route_init
Ok(#pg_name {
pool,
hooks: <#hooks_ident as ::autumn_web::hooks::RepositoryHooksDefault>::autumn_default(),
idempotency: _parts
.extensions
.get::<::autumn_web::idempotency::IdempotencyContext>()
.cloned(),
#tenant_init_field
__autumn_read_route,
__autumn_statement_timeout_ms: __autumn_timeout_ms,
__autumn_slow_threshold,
__autumn_route,
})
}
} else {
quote! {
#timeout_route_init
Ok(#pg_name {
pool,
hooks: <#hooks_ident as ::autumn_web::hooks::RepositoryHooksDefault>::autumn_default(),
#tenant_init_field
__autumn_read_route,
__autumn_statement_timeout_ms: __autumn_timeout_ms,
__autumn_slow_threshold,
__autumn_route,
})
}
};
let hook_support_methods = if commit_hooks_enabled {
quote! {
#[doc(hidden)]
fn __autumn_repository_commit_hook_key() -> &'static str {
::core::concat!(
::core::env!("CARGO_PKG_NAME"),
"::",
::core::module_path!(),
"::",
::core::stringify!(#table_ident),
"::",
::core::stringify!(#model_name),
"::",
::core::stringify!(#hooks_ident)
)
}
#[doc(hidden)]
fn __autumn_register_repository_commit_hooks() {
static __AUTUMN_REGISTERED: ::std::sync::OnceLock<()> = ::std::sync::OnceLock::new();
__AUTUMN_REGISTERED.get_or_init(|| {
::autumn_web::__private::register_repository_commit_hook_runner(
Self::__autumn_repository_commit_hook_key(),
|__ctx, __record| async move {
let mut __ctx: ::autumn_web::hooks::MutationContext =
::autumn_web::reexports::serde_json::from_value(__ctx)
.map_err(|__error| {
::autumn_web::AutumnError::internal_server_error_msg(
format!("deserialize repository create hook context: {__error}")
)
})?;
let __record: #model_name =
#model_name::__autumn_commit_hook_from_value(__record)?;
let __hooks =
<#hooks_ident as ::autumn_web::hooks::RepositoryHooksDefault>::autumn_default();
<#hooks_ident as ::autumn_web::hooks::MutationHooks>::after_create_commit(
&__hooks,
&mut __ctx,
&__record,
)
.await
},
|__ctx, __record| async move {
let mut __ctx: ::autumn_web::hooks::MutationContext =
::autumn_web::reexports::serde_json::from_value(__ctx)
.map_err(|__error| {
::autumn_web::AutumnError::internal_server_error_msg(
format!("deserialize repository update hook context: {__error}")
)
})?;
let __record: #model_name =
#model_name::__autumn_commit_hook_from_value(__record)?;
let __hooks =
<#hooks_ident as ::autumn_web::hooks::RepositoryHooksDefault>::autumn_default();
<#hooks_ident as ::autumn_web::hooks::MutationHooks>::after_update_commit(
&__hooks,
&mut __ctx,
&__record,
)
.await
},
|__ctx, __record| async move {
let mut __ctx: ::autumn_web::hooks::MutationContext =
::autumn_web::reexports::serde_json::from_value(__ctx)
.map_err(|__error| {
::autumn_web::AutumnError::internal_server_error_msg(
format!("deserialize repository delete hook context: {__error}")
)
})?;
let __record: #model_name =
#model_name::__autumn_commit_hook_from_value(__record)?;
let __hooks =
<#hooks_ident as ::autumn_web::hooks::RepositoryHooksDefault>::autumn_default();
<#hooks_ident as ::autumn_web::hooks::MutationHooks>::after_delete_commit(
&__hooks,
&mut __ctx,
&__record,
)
.await
},
);
});
}
}
} else {
quote! {}
};
let hook_inventory_registration = if commit_hooks_enabled {
quote! {
::autumn_web::reexports::inventory::submit! {
::autumn_web::__private::RepositoryCommitHookDescriptor {
register: #pg_name::__autumn_register_repository_commit_hooks,
}
}
}
} else {
quote! {}
};
let vh_create_in_hooks = if config.versioned {
let vh = vh_insert_ts(
table_name,
"insert",
true,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! { #vh }
} else {
quote! {}
};
let save_body = if config.tenant_scoped {
if commit_hooks_enabled {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
let tenant_id = if self.across_tenants {
::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
Self::__autumn_register_repository_commit_hooks();
let mut conn = self.__autumn_acquire_conn().await?;
let (record, mut ctx, __autumn_commit_hook_id, __autumn_commit_hook_owner, __autumn_commit_hook_record) = conn
.transaction::<(#model_name, MutationContext, ::std::string::String, ::std::string::String, ::autumn_web::reexports::serde_json::Value), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut input = new.clone();
let mut ctx = MutationContext::new(MutationOp::Create);
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
ctx.set_idempotency_key(__autumn_idempotency.scoped_key());
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
self.hooks.before_create(&mut ctx, &mut input).await?;
let record = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(::autumn_web::tenancy::TenantInsertable::tenant_values(input.clone(), t))
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(input)
.get_result::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
#vh_create_in_hooks
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
let (__autumn_commit_hook_id, __autumn_commit_hook_owner) = ::autumn_web::__private::enqueue_repository_commit_hook_pending_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"create",
ctx.idempotency_key.as_deref(),
__autumn_commit_hook_discriminator.as_deref(),
&ctx,
&__autumn_commit_hook_record,
)
.await?;
Ok((record, ctx, __autumn_commit_hook_id, __autumn_commit_hook_owner, __autumn_commit_hook_record))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
let __autumn_pending_heartbeat =
::autumn_web::__private::start_repository_commit_hook_pending_finalizer_heartbeat(
self.pool.clone(),
__autumn_commit_hook_id.clone(),
__autumn_commit_hook_owner.clone(),
);
let __autumn_after_create = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_create(&mut ctx, &record)
)
.await;
match __autumn_after_create {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {}
::core::result::Result::Ok(::core::result::Result::Err(__autumn_error)) => {
let __autumn_error_message = ::std::format!("{__autumn_error}");
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
__autumn_error_message,
)
.await;
__autumn_pending_heartbeat.cancel();
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
::core::result::Result::Err(__autumn_panic) => {
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
"after_create panicked",
)
.await;
__autumn_pending_heartbeat.cancel();
if self.idempotency.is_some() {
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(
::autumn_web::AutumnError::internal_server_error_msg("after_create panicked")
)
);
}
::std::panic::resume_unwind(__autumn_panic);
}
}
let __autumn_finalize_result = ::autumn_web::__private::finalize_repository_commit_hook_after_hook(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
&ctx,
&__autumn_commit_hook_record,
)
.await;
__autumn_pending_heartbeat.cancel();
match __autumn_finalize_result {
::core::result::Result::Ok(()) => {
::autumn_web::__private::kick_repository_commit_hook_dispatcher(&self.pool);
}
::core::result::Result::Err(__autumn_error) => {
::autumn_web::reexports::tracing::warn!(
hook_id = %__autumn_commit_hook_id,
error = %__autumn_error,
"failed to finalize repository create commit hook after mutation commit; failing request closed"
);
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
}
Ok(record)
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
let tenant_id = if self.across_tenants {
::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let mut conn = self.__autumn_acquire_conn().await?;
let (record, mut ctx) = conn
.transaction::<(#model_name, MutationContext), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut input = new.clone();
let mut ctx = MutationContext::new(MutationOp::Create);
self.hooks.before_create(&mut ctx, &mut input).await?;
let record = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(::autumn_web::tenancy::TenantInsertable::tenant_values(input.clone(), t))
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(input)
.get_result::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
#vh_create_in_hooks
Ok((record, ctx))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
self.hooks.after_create(&mut ctx, &record).await?;
Ok(record)
}
}
} else {
if commit_hooks_enabled {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
Self::__autumn_register_repository_commit_hooks();
let mut conn = self.__autumn_acquire_conn().await?;
let (record, mut ctx, __autumn_commit_hook_id, __autumn_commit_hook_owner, __autumn_commit_hook_record) = conn
.transaction::<(#model_name, MutationContext, ::std::string::String, ::std::string::String, ::autumn_web::reexports::serde_json::Value), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut input = new.clone();
let mut ctx = MutationContext::new(MutationOp::Create);
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
ctx.set_idempotency_key(__autumn_idempotency.scoped_key());
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
self.hooks.before_create(&mut ctx, &mut input).await?;
let record = ::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(input)
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
#vh_create_in_hooks
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
let (__autumn_commit_hook_id, __autumn_commit_hook_owner) = ::autumn_web::__private::enqueue_repository_commit_hook_pending_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"create",
ctx.idempotency_key.as_deref(),
__autumn_commit_hook_discriminator.as_deref(),
&ctx,
&__autumn_commit_hook_record,
)
.await?;
Ok((record, ctx, __autumn_commit_hook_id, __autumn_commit_hook_owner, __autumn_commit_hook_record))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
let __autumn_pending_heartbeat =
::autumn_web::__private::start_repository_commit_hook_pending_finalizer_heartbeat(
self.pool.clone(),
__autumn_commit_hook_id.clone(),
__autumn_commit_hook_owner.clone(),
);
let __autumn_after_create = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_create(&mut ctx, &record)
)
.await;
match __autumn_after_create {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {}
::core::result::Result::Ok(::core::result::Result::Err(__autumn_error)) => {
let __autumn_error_message = ::std::format!("{__autumn_error}");
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
__autumn_error_message,
)
.await;
__autumn_pending_heartbeat.cancel();
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
::core::result::Result::Err(__autumn_panic) => {
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
"after_create panicked",
)
.await;
__autumn_pending_heartbeat.cancel();
if self.idempotency.is_some() {
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(
::autumn_web::AutumnError::internal_server_error_msg("after_create panicked")
)
);
}
::std::panic::resume_unwind(__autumn_panic);
}
}
let __autumn_finalize_result = ::autumn_web::__private::finalize_repository_commit_hook_after_hook(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
&ctx,
&__autumn_commit_hook_record,
)
.await;
__autumn_pending_heartbeat.cancel();
match __autumn_finalize_result {
::core::result::Result::Ok(()) => {
::autumn_web::__private::kick_repository_commit_hook_dispatcher(&self.pool);
}
::core::result::Result::Err(__autumn_error) => {
::autumn_web::reexports::tracing::warn!(
hook_id = %__autumn_commit_hook_id,
error = %__autumn_error,
"failed to finalize repository create commit hook after mutation commit; failing request closed"
);
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
}
Ok(record)
}
} else if config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"insert",
true,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
let mut conn = self.__autumn_acquire_conn().await?;
let (record, mut ctx) = conn
.transaction::<(#model_name, MutationContext), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut input = new.clone();
let mut ctx = MutationContext::new(MutationOp::Create);
self.hooks.before_create(&mut ctx, &mut input).await?;
let record = ::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(input)
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
#vh_insert
Ok((record, ctx))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
self.hooks.after_create(&mut ctx, &record).await?;
Ok(record)
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
let mut conn = self.__autumn_acquire_conn().await?;
let (record, mut ctx) = conn
.transaction::<(#model_name, MutationContext), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut input = new.clone();
let mut ctx = MutationContext::new(MutationOp::Create);
self.hooks.before_create(&mut ctx, &mut input).await?;
let record = ::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(input)
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
Ok((record, ctx))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
self.hooks.after_create(&mut ctx, &record).await?;
Ok(record)
}
}
};
let draft_ext_trait = format_ident!("{}DraftExt", model_name);
let vh_update_in_hooks = if config.versioned {
let vh = vh_insert_ts(
table_name,
"update",
true,
"e! { record },
Some("e! { __vh_before }),
"e! { conn },
model_name,
);
quote! { #vh }
} else {
quote! {}
};
let update_body = if config.tenant_scoped {
if commit_hooks_enabled {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks, UpdateDraft};
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _};
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
Self::__autumn_register_repository_commit_hooks();
let mut conn = self.__autumn_acquire_conn().await?;
let (record, mut ctx, __autumn_commit_hook_id, __autumn_commit_hook_owner, __autumn_commit_hook_record) = conn
.transaction::<(#model_name, MutationContext, ::std::string::String, ::std::string::String, ::autumn_web::reexports::serde_json::Value), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut ctx = MutationContext::new(MutationOp::Update);
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
ctx.set_idempotency_key(__autumn_idempotency.scoped_key());
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
let (record, __vh_before): (#model_name, ::core::option::Option<#model_name>) = if let ::core::option::Option::Some(expected_version) =
changes.__autumn_lock_version_expected()
{
let load_query = #table_ident::table.find(id);
let current = if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
if let ::core::option::Option::Some(actual_version) =
current.__autumn_lock_version_actual()
{
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
let __vh_before_inner = current.clone();
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(¤t, changes)?;
if let ::core::option::Option::Some(ref t) = tenant_id {
draft.after.tenant_id = t.clone();
}
self.hooks.before_update(&mut ctx, &mut draft).await?;
if let ::core::option::Option::Some(ref t) = tenant_id {
draft.after.tenant_id = t.clone();
}
let proposed = draft.into_after();
let update_target = #table_ident::table.find(id);
let updated = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::update(update_target.filter(#table_ident::tenant_id.eq(t)))
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::update(update_target)
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
(updated, ::core::option::Option::Some(__vh_before_inner))
} else {
let load_query = #table_ident::table.find(id);
let current = if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
let __vh_before_inner = current.clone();
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(¤t, changes)?;
if let ::core::option::Option::Some(ref t) = tenant_id {
draft.after.tenant_id = t.clone();
}
self.hooks.before_update(&mut ctx, &mut draft).await?;
if let ::core::option::Option::Some(ref t) = tenant_id {
draft.after.tenant_id = t.clone();
}
let proposed = draft.into_after();
let update_target = #table_ident::table.find(id);
let updated = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::update(update_target.filter(#table_ident::tenant_id.eq(t)))
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::update(update_target)
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
(updated, ::core::option::Option::Some(__vh_before_inner))
};
if let ::core::option::Option::Some(ref __vh_before) = __vh_before {
#vh_update_in_hooks
}
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
let (__autumn_commit_hook_id, __autumn_commit_hook_owner) = ::autumn_web::__private::enqueue_repository_commit_hook_pending_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"update",
ctx.idempotency_key.as_deref(),
__autumn_commit_hook_discriminator.as_deref(),
&ctx,
&__autumn_commit_hook_record,
)
.await?;
Ok((record, ctx, __autumn_commit_hook_id, __autumn_commit_hook_owner, __autumn_commit_hook_record))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
let __autumn_pending_heartbeat =
::autumn_web::__private::start_repository_commit_hook_pending_finalizer_heartbeat(
self.pool.clone(),
__autumn_commit_hook_id.clone(),
__autumn_commit_hook_owner.clone(),
);
let __autumn_after_update = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_update(&mut ctx, &record)
)
.await;
match __autumn_after_update {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {}
::core::result::Result::Ok(::core::result::Result::Err(__autumn_error)) => {
let __autumn_error_message = ::std::format!("{__autumn_error}");
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
__autumn_error_message,
)
.await;
__autumn_pending_heartbeat.cancel();
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
::core::result::Result::Err(__autumn_panic) => {
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
"after_update panicked",
)
.await;
__autumn_pending_heartbeat.cancel();
if self.idempotency.is_some() {
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(
::autumn_web::AutumnError::internal_server_error_msg("after_update panicked")
)
);
}
::std::panic::resume_unwind(__autumn_panic);
}
}
let __autumn_finalize_result = ::autumn_web::__private::finalize_repository_commit_hook_after_hook(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
&ctx,
&__autumn_commit_hook_record,
)
.await;
__autumn_pending_heartbeat.cancel();
match __autumn_finalize_result {
::core::result::Result::Ok(()) => {
::autumn_web::__private::kick_repository_commit_hook_dispatcher(&self.pool);
}
::core::result::Result::Err(__autumn_error) => {
::autumn_web::reexports::tracing::warn!(
hook_id = %__autumn_commit_hook_id,
error = %__autumn_error,
"failed to finalize repository update commit hook after mutation commit; failing request closed"
);
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
}
Ok(record)
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks, UpdateDraft};
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _};
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let mut conn = self.__autumn_acquire_conn().await?;
let (record, mut ctx) = conn
.transaction::<(#model_name, MutationContext), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut ctx = MutationContext::new(MutationOp::Update);
let (record, __vh_before): (#model_name, ::core::option::Option<#model_name>) = if let ::core::option::Option::Some(expected_version) =
changes.__autumn_lock_version_expected()
{
let load_query = #table_ident::table.find(id);
let current = if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
if let ::core::option::Option::Some(actual_version) =
current.__autumn_lock_version_actual()
{
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
let __vh_before_inner = current.clone();
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(¤t, changes)?;
if let ::core::option::Option::Some(ref t) = tenant_id {
draft.after.tenant_id = t.clone();
}
self.hooks.before_update(&mut ctx, &mut draft).await?;
if let ::core::option::Option::Some(ref t) = tenant_id {
draft.after.tenant_id = t.clone();
}
let proposed = draft.into_after();
let update_target = #table_ident::table.find(id);
let updated = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::update(update_target.filter(#table_ident::tenant_id.eq(t)))
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::update(update_target)
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
(updated, ::core::option::Option::Some(__vh_before_inner))
} else {
let load_query = #table_ident::table.find(id);
let current = if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
let __vh_before_inner = current.clone();
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(¤t, changes)?;
if let ::core::option::Option::Some(ref t) = tenant_id {
draft.after.tenant_id = t.clone();
}
self.hooks.before_update(&mut ctx, &mut draft).await?;
if let ::core::option::Option::Some(ref t) = tenant_id {
draft.after.tenant_id = t.clone();
}
let proposed = draft.into_after();
let update_target = #table_ident::table.find(id);
let updated = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::update(update_target.filter(#table_ident::tenant_id.eq(t)))
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::update(update_target)
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
(updated, ::core::option::Option::Some(__vh_before_inner))
};
if let ::core::option::Option::Some(ref __vh_before) = __vh_before {
#vh_update_in_hooks
}
Ok((record, ctx))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
self.hooks.after_update(&mut ctx, &record).await?;
Ok(record)
}
}
} else {
if commit_hooks_enabled {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks, UpdateDraft};
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _};
Self::__autumn_register_repository_commit_hooks();
let mut conn = self.__autumn_acquire_conn().await?;
let (record, mut ctx, __autumn_commit_hook_id, __autumn_commit_hook_owner, __autumn_commit_hook_record) = conn
.transaction::<(#model_name, MutationContext, ::std::string::String, ::std::string::String, ::autumn_web::reexports::serde_json::Value), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut ctx = MutationContext::new(MutationOp::Update);
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
ctx.set_idempotency_key(__autumn_idempotency.scoped_key());
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
let (record, __vh_before): (#model_name, ::core::option::Option<#model_name>) = if let ::core::option::Option::Some(expected_version) =
changes.__autumn_lock_version_expected()
{
let current = #table_ident::table
.find(id)
.for_update()
.first::<#model_name>(conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
if let ::core::option::Option::Some(actual_version) =
current.__autumn_lock_version_actual()
{
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
let __vh_before_inner = current.clone();
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(¤t, changes)?;
self.hooks.before_update(&mut ctx, &mut draft).await?;
let proposed = draft.into_after();
let updated = ::autumn_web::reexports::diesel::update(#table_ident::table.find(id))
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
(updated, ::core::option::Option::Some(__vh_before_inner))
} else {
let current = #table_ident::table
.find(id)
.for_update()
.first::<#model_name>(conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
let __vh_before_inner = current.clone();
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(¤t, changes)?;
self.hooks.before_update(&mut ctx, &mut draft).await?;
let proposed = draft.into_after();
let updated = ::autumn_web::reexports::diesel::update(#table_ident::table.find(id))
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
(updated, ::core::option::Option::Some(__vh_before_inner))
};
if let ::core::option::Option::Some(ref __vh_before) = __vh_before {
#vh_update_in_hooks
}
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
let (__autumn_commit_hook_id, __autumn_commit_hook_owner) = ::autumn_web::__private::enqueue_repository_commit_hook_pending_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"update",
ctx.idempotency_key.as_deref(),
__autumn_commit_hook_discriminator.as_deref(),
&ctx,
&__autumn_commit_hook_record,
)
.await?;
Ok((record, ctx, __autumn_commit_hook_id, __autumn_commit_hook_owner, __autumn_commit_hook_record))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
let __autumn_pending_heartbeat =
::autumn_web::__private::start_repository_commit_hook_pending_finalizer_heartbeat(
self.pool.clone(),
__autumn_commit_hook_id.clone(),
__autumn_commit_hook_owner.clone(),
);
let __autumn_after_update = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_update(&mut ctx, &record)
)
.await;
match __autumn_after_update {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {}
::core::result::Result::Ok(::core::result::Result::Err(__autumn_error)) => {
let __autumn_error_message = ::std::format!("{__autumn_error}");
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
__autumn_error_message,
)
.await;
__autumn_pending_heartbeat.cancel();
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
::core::result::Result::Err(__autumn_panic) => {
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
"after_update panicked",
)
.await;
__autumn_pending_heartbeat.cancel();
if self.idempotency.is_some() {
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(
::autumn_web::AutumnError::internal_server_error_msg("after_update panicked")
)
);
}
::std::panic::resume_unwind(__autumn_panic);
}
}
let __autumn_finalize_result = ::autumn_web::__private::finalize_repository_commit_hook_after_hook(
&self.pool,
&__autumn_commit_hook_id,
&__autumn_commit_hook_owner,
&ctx,
&__autumn_commit_hook_record,
)
.await;
__autumn_pending_heartbeat.cancel();
match __autumn_finalize_result {
::core::result::Result::Ok(()) => {
::autumn_web::__private::kick_repository_commit_hook_dispatcher(&self.pool);
}
::core::result::Result::Err(__autumn_error) => {
::autumn_web::reexports::tracing::warn!(
hook_id = %__autumn_commit_hook_id,
error = %__autumn_error,
"failed to finalize repository update commit hook after mutation commit; failing request closed"
);
return ::core::result::Result::Err(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
}
Ok(record)
}
} else if config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"update",
true,
"e! { record },
Some("e! { __vh_before }),
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks, UpdateDraft};
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _};
let mut conn = self.__autumn_acquire_conn().await?;
let (record, mut ctx) = conn
.transaction::<(#model_name, MutationContext), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut ctx = MutationContext::new(MutationOp::Update);
let (record, __vh_before): (#model_name, #model_name) = if let ::core::option::Option::Some(expected_version) =
changes.__autumn_lock_version_expected()
{
let current = #table_ident::table
.find(id)
.for_update()
.first::<#model_name>(conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
if let ::core::option::Option::Some(actual_version) =
current.__autumn_lock_version_actual()
{
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
let __vh_before = current.clone();
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(¤t, changes)?;
self.hooks.before_update(&mut ctx, &mut draft).await?;
let proposed = draft.into_after();
let updated = ::autumn_web::reexports::diesel::update(#table_ident::table.find(id))
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
(updated, __vh_before)
} else {
let current = #table_ident::table
.find(id)
.first::<#model_name>(conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
let __vh_before = current.clone();
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(¤t, changes)?;
self.hooks.before_update(&mut ctx, &mut draft).await?;
let proposed = draft.into_after();
let updated = ::autumn_web::reexports::diesel::update(#table_ident::table.find(id))
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
(updated, __vh_before)
};
#vh_insert
Ok((record, ctx))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
self.hooks.after_update(&mut ctx, &record).await?;
Ok(record)
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks, UpdateDraft};
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _};
let mut conn = self.__autumn_acquire_conn().await?;
let (record, mut ctx) = conn
.transaction::<(#model_name, MutationContext), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut ctx = MutationContext::new(MutationOp::Update);
let record: #model_name = if let ::core::option::Option::Some(expected_version) =
changes.__autumn_lock_version_expected()
{
let current = #table_ident::table
.find(id)
.for_update()
.first::<#model_name>(conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
if let ::core::option::Option::Some(actual_version) =
current.__autumn_lock_version_actual()
{
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(¤t, changes)?;
self.hooks.before_update(&mut ctx, &mut draft).await?;
let proposed = draft.into_after();
::autumn_web::reexports::diesel::update(#table_ident::table.find(id))
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?
} else {
let current = #table_ident::table
.find(id)
.first::<#model_name>(conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(¤t, changes)?;
self.hooks.before_update(&mut ctx, &mut draft).await?;
let proposed = draft.into_after();
::autumn_web::reexports::diesel::update(#table_ident::table.find(id))
.set(proposed.clone())
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?
};
Ok((record, ctx))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
self.hooks.after_update(&mut ctx, &record).await?;
Ok(record)
}
}
};
let hooked_delete_mutation_stmt = if config.soft_delete {
quote! {
let __now = ::autumn_web::reexports::chrono::Utc::now().naive_utc();
let __autumn_deleted = ::autumn_web::reexports::diesel::update(
#table_ident::table.find(id).filter(#table_ident::deleted_at.is_null())
)
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
if __autumn_deleted == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
}
} else {
quote! {
let __autumn_deleted = ::autumn_web::reexports::diesel::delete(#table_ident::table.find(id))
.execute(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
if __autumn_deleted == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
}
};
let vh_delete_in_hooks = if config.versioned {
let vh = vh_insert_ts(
table_name,
"delete",
true,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! { #vh }
} else {
quote! {}
};
let delete_body = if config.tenant_scoped {
let tenant_id_setup = quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
};
if commit_hooks_enabled {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
#tenant_id_setup
Self::__autumn_register_repository_commit_hooks();
let mut conn = self.__autumn_acquire_conn().await?;
conn
.transaction::<(), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut ctx = MutationContext::new(MutationOp::Delete);
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
ctx.set_idempotency_key(__autumn_idempotency.scoped_key());
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
let load_query = #table_ident::table.find(id) #sd_filter;
let record = if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
self.hooks.before_delete(&mut ctx, &record).await?;
#hooked_delete_mutation_stmt
#vh_delete_in_hooks
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
::autumn_web::__private::enqueue_repository_commit_hook_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"delete",
ctx.idempotency_key.as_deref(),
__autumn_commit_hook_discriminator.as_deref(),
&ctx,
&__autumn_commit_hook_record,
)
.await?;
Ok(())
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
::autumn_web::__private::kick_repository_commit_hook_dispatcher(&self.pool);
Ok(())
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
#tenant_id_setup
let mut conn = self.__autumn_acquire_conn().await?;
conn
.transaction::<(), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut ctx = MutationContext::new(MutationOp::Delete);
let load_query = #table_ident::table.find(id);
let record = if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
self.hooks.before_delete(&mut ctx, &record).await?;
#hooked_delete_mutation_stmt
#vh_delete_in_hooks
Ok(())
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
Ok(())
}
}
} else if commit_hooks_enabled {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
Self::__autumn_register_repository_commit_hooks();
let mut conn = self.__autumn_acquire_conn().await?;
conn
.transaction::<(), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut ctx = MutationContext::new(MutationOp::Delete);
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
ctx.set_idempotency_key(__autumn_idempotency.scoped_key());
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
let load_query = #table_ident::table.find(id) #sd_filter;
let record = load_query.for_update().first::<#model_name>(conn).await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
self.hooks.before_delete(&mut ctx, &record).await?;
#hooked_delete_mutation_stmt
#vh_delete_in_hooks
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
::autumn_web::__private::enqueue_repository_commit_hook_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"delete",
ctx.idempotency_key.as_deref(),
__autumn_commit_hook_discriminator.as_deref(),
&ctx,
&__autumn_commit_hook_record,
)
.await?;
Ok(())
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
::autumn_web::__private::kick_repository_commit_hook_dispatcher(&self.pool);
Ok(())
}
} else if config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"delete",
true,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
let mut conn = self.__autumn_acquire_conn().await?;
conn
.transaction::<(), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut ctx = MutationContext::new(MutationOp::Delete);
let load_query = #table_ident::table.find(id);
let record = load_query.for_update().first::<#model_name>(conn).await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
self.hooks.before_delete(&mut ctx, &record).await?;
#hooked_delete_mutation_stmt
#vh_insert
Ok(())
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
Ok(())
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
let mut conn = self.__autumn_acquire_conn().await?;
conn
.transaction::<(), ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut ctx = MutationContext::new(MutationOp::Delete);
let load_query = #table_ident::table.find(id);
let record = load_query.for_update().first::<#model_name>(conn).await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
self.hooks.before_delete(&mut ctx, &record).await?;
#hooked_delete_mutation_stmt
Ok(())
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
Ok(())
}
};
let save_many_body = {
let tenant_id_setup = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let tenant_id = tenant_id.as_ref();
}
} else {
quote! {}
};
let insert_expr = if config.tenant_scoped {
quote! {
{
if let ::core::option::Option::Some(t) = tenant_id {
let values: Vec<_> = chunk.iter().cloned().map(|item| ::autumn_web::tenancy::TenantInsertable::tenant_values(item, t)).collect();
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(values)
.get_results::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(chunk.to_vec())
.get_results::<#model_name>(conn)
.await
}
}
}
} else {
quote! {
{
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(chunk.to_vec())
.get_results::<#model_name>(conn)
.await
}
}
};
let vh_create_many_in_hooks = if config.versioned {
let vh = vh_insert_ts(
table_name,
"insert",
true,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! {
for (idx, record) in chunk_inserted.iter().enumerate() {
let global_idx = offset + idx;
let ctx = &contexts_ref[global_idx];
#vh
}
}
} else {
quote! {}
};
if commit_hooks_enabled {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
use ::autumn_web::repository::AutumnColumnCountSpecific as _;
use ::autumn_web::repository::AutumnColumnCountFallback as _;
use ::autumn_web::repository::AutumnCorrelateExt as _;
if new.is_empty() {
return Ok(Vec::new());
}
#tenant_id_setup
Self::__autumn_register_repository_commit_hooks();
let mut inputs = new.to_vec();
let mut contexts = Vec::new();
for input in &mut inputs {
let mut ctx = MutationContext::new(MutationOp::Create);
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
ctx.set_idempotency_key(__autumn_idempotency.scoped_key());
}
self.hooks.before_create(&mut ctx, input).await?;
contexts.push(ctx);
}
let mut conn = self.__autumn_acquire_conn().await?;
let contexts_ref = &contexts;
let (inserted_records, hook_infos, global_indices) = conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut inserted_records = Vec::new();
let mut hook_infos = Vec::new();
let mut global_indices = Vec::new();
let mut offset = 0;
let cols = (&new[0]).__autumn_column_count() + #tenant_extra;
let chunk_size = if cols == 0 { 1000 } else { (65535usize / cols).min(1000).max(1) };
for chunk in inputs.chunks(chunk_size) {
let chunk_inserted = (#insert_expr)
.map_err(::autumn_web::AutumnError::from)?;
#vh_create_many_in_hooks
let mut hook_records = Vec::new();
for record in &chunk_inserted {
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
hook_records.push((__autumn_commit_hook_record, __autumn_commit_hook_discriminator));
}
let mapped_indices: Vec<usize> = (0..chunk_inserted.len()).collect();
for &mapped_idx in &mapped_indices {
global_indices.push(offset + mapped_idx);
}
let hook_inputs: Vec<_> = chunk_inserted.iter().enumerate().map(|(idx, _)| {
let mapped_idx = idx;
let global_idx = offset + mapped_idx;
let ctx = &contexts_ref[global_idx];
let (ref record_val, ref discriminator) = hook_records[idx];
(
ctx.idempotency_key.clone(),
discriminator.clone(),
ctx,
record_val,
)
}).collect();
let chunk_hook_infos = ::autumn_web::__private::enqueue_repository_commit_hooks_pending_bulk_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"create",
&hook_inputs,
)
.await?;
for (idx, info) in chunk_hook_infos.into_iter().enumerate() {
hook_infos.push((info.0, info.1, hook_records[idx].0.clone()));
}
inserted_records.extend(chunk_inserted);
offset += chunk.len();
}
Ok((inserted_records, hook_infos, global_indices))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
let mut __autumn_first_err: ::core::option::Option<::autumn_web::AutumnError> = ::core::option::Option::None;
let mut __autumn_first_panic: ::core::option::Option<::std::boxed::Box<dyn ::core::any::Any + ::core::marker::Send>> = ::core::option::Option::None;
for (idx, record) in inserted_records.iter().enumerate() {
let global_idx = global_indices[idx];
let mut ctx = contexts[global_idx].clone();
let (hook_id, hook_owner, hook_record) = &hook_infos[idx];
let __autumn_pending_heartbeat =
::autumn_web::__private::start_repository_commit_hook_pending_finalizer_heartbeat(
self.pool.clone(),
hook_id.clone(),
hook_owner.clone(),
);
let __autumn_after_create = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_create(&mut ctx, record)
)
.await;
match __autumn_after_create {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {
let __autumn_finalize_result = ::autumn_web::__private::finalize_repository_commit_hook_after_hook(
&self.pool,
hook_id,
hook_owner,
&ctx,
hook_record,
)
.await;
__autumn_pending_heartbeat.cancel();
if let ::core::result::Result::Err(__autumn_error) = __autumn_finalize_result {
::autumn_web::reexports::tracing::warn!(
hook_id = %hook_id,
error = %__autumn_error,
"failed to finalize repository create commit hook after mutation commit; failing request closed"
);
if __autumn_first_err.is_none() {
__autumn_first_err = ::core::option::Option::Some(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
}
}
::core::result::Result::Ok(::core::result::Result::Err(__autumn_error)) => {
let __autumn_error_message = ::std::format!("{__autumn_error}");
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
hook_id,
hook_owner,
__autumn_error_message,
)
.await;
__autumn_pending_heartbeat.cancel();
if __autumn_first_err.is_none() {
__autumn_first_err = ::core::option::Option::Some(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
}
::core::result::Result::Err(__autumn_panic) => {
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
hook_id,
hook_owner,
"after_create panicked",
)
.await;
__autumn_pending_heartbeat.cancel();
if __autumn_first_panic.is_none() {
__autumn_first_panic = ::core::option::Option::Some(__autumn_panic);
}
}
}
}
if #commit_hooks_enabled {
::autumn_web::__private::kick_repository_commit_hook_dispatcher(&self.pool);
}
if let ::core::option::Option::Some(err) = __autumn_first_err {
return ::core::result::Result::Err(err);
}
if let ::core::option::Option::Some(panic_val) = __autumn_first_panic {
::std::panic::resume_unwind(panic_val);
}
Ok(inserted_records)
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
use ::autumn_web::repository::AutumnColumnCountSpecific as _;
use ::autumn_web::repository::AutumnColumnCountFallback as _;
use ::autumn_web::repository::AutumnCorrelateExt as _;
if new.is_empty() {
return Ok(Vec::new());
}
#tenant_id_setup
let mut inputs = new.to_vec();
let mut contexts = Vec::new();
for input in &mut inputs {
let mut ctx = MutationContext::new(MutationOp::Create);
self.hooks.before_create(&mut ctx, input).await?;
contexts.push(ctx);
}
let mut conn = self.__autumn_acquire_conn().await?;
let contexts_ref = &contexts;
let inputs_ref = &inputs;
let inserted_records = conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut inserted = Vec::new();
let mut offset = 0;
let cols = (&new[0]).__autumn_column_count() + #tenant_extra;
let chunk_size = if cols == 0 { 1000 } else { (65535usize / cols).min(1000).max(1) };
for chunk in inputs_ref.chunks(chunk_size) {
let chunk_inserted = (#insert_expr)
.map_err(::autumn_web::AutumnError::from)?;
#vh_create_many_in_hooks
inserted.extend(chunk_inserted);
offset += chunk.len();
}
Ok(inserted)
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
let mapped_indices: Vec<usize> = (0..inserted_records.len()).collect();
let mut __autumn_first_err: ::core::option::Option<::autumn_web::AutumnError> = ::core::option::Option::None;
for (idx, record) in inserted_records.iter().enumerate() {
let orig_idx = mapped_indices[idx];
let mut ctx = contexts[orig_idx].clone();
if let ::core::result::Result::Err(err) = self.hooks.after_create(&mut ctx, record).await {
if __autumn_first_err.is_none() {
__autumn_first_err = ::core::option::Option::Some(err);
}
}
}
if let ::core::option::Option::Some(err) = __autumn_first_err {
return ::core::result::Result::Err(err);
}
Ok(inserted_records)
}
}
};
let save_many_skip_invalid_body = {
let tenant_id_setup = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let tenant_id = tenant_id.as_ref();
}
} else {
quote! {}
};
let insert_expr = if config.tenant_scoped {
quote! {
{
if let ::core::option::Option::Some(t) = tenant_id {
let values: Vec<_> = chunk.iter().map(|item| ::autumn_web::tenancy::TenantInsertable::tenant_values(item.0.clone(), t)).collect();
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(values)
.get_results::<#model_name>(conn)
.await
} else {
let values: Vec<_> = chunk.iter().map(|item| item.0.clone()).collect();
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(values)
.get_results::<#model_name>(conn)
.await
}
}
}
} else {
quote! {
{
let values: Vec<_> = chunk.iter().map(|item| item.0.clone()).collect();
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(values)
.get_results::<#model_name>(conn)
.await
}
}
};
let row_insert_expr = if config.tenant_scoped {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
let values = ::autumn_web::tenancy::TenantInsertable::tenant_values(item.0.clone(), t);
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(values)
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(item.0.clone())
.get_result::<#model_name>(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(item.0.clone())
.get_result::<#model_name>(conn)
.await
}
};
let idempotency_setup = if commit_hooks_enabled {
quote! {
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
ctx.set_idempotency_key(__autumn_idempotency.scoped_key());
}
}
} else {
quote! {}
};
let register_commit_hooks = if commit_hooks_enabled {
quote! { Self::__autumn_register_repository_commit_hooks(); }
} else {
quote! {}
};
let skip_invalid_impl = if commit_hooks_enabled {
quote! {
if valid_items.is_empty() {
return Ok((successes, failures));
}
let cols = (&valid_items[0].0).__autumn_column_count() + #tenant_extra;
let chunk_size = if cols == 0 { 1000 } else { (65535usize / cols).min(1000).max(1) };
let mut offset = 0;
for chunk in valid_items.chunks(chunk_size) {
let batch_res = conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let chunk_inserted = (#insert_expr)
.map_err(::autumn_web::AutumnError::from)?;
let mut hook_records = Vec::new();
for record in &chunk_inserted {
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
hook_records.push((__autumn_commit_hook_record, __autumn_commit_hook_discriminator));
}
let mapped_indices: Vec<usize> = (0..chunk_inserted.len()).collect();
let hook_inputs: Vec<_> = chunk_inserted.iter().enumerate().map(|(idx, _)| {
let mapped_idx = idx;
let ctx = &chunk[mapped_idx].1;
let (ref record_val, ref discriminator) = hook_records[idx];
(
ctx.idempotency_key.clone(),
discriminator.clone(),
ctx,
record_val,
)
}).collect();
let chunk_hook_infos = ::autumn_web::__private::enqueue_repository_commit_hooks_pending_bulk_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"create",
&hook_inputs,
)
.await?;
let mut hook_infos = Vec::new();
for (idx, info) in chunk_hook_infos.into_iter().enumerate() {
hook_infos.push((info.0, info.1, hook_records[idx].0.clone()));
}
Ok((chunk_inserted, hook_infos, mapped_indices))
}
.scope_boxed()
})
.await;
match batch_res {
Ok((inserted_chunk, hook_infos, mapped_indices)) => {
for (idx, record) in inserted_chunk.into_iter().enumerate() {
let mapped_idx = mapped_indices[idx];
let mut ctx = chunk[mapped_idx].1.clone();
let (hook_id, hook_owner, hook_record) = &hook_infos[idx];
let __autumn_pending_heartbeat =
::autumn_web::__private::start_repository_commit_hook_pending_finalizer_heartbeat(
self.pool.clone(),
hook_id.clone(),
hook_owner.clone(),
);
let __autumn_after_create = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_create(&mut ctx, &record)
)
.await;
match __autumn_after_create {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {
let __autumn_finalize_result = ::autumn_web::__private::finalize_repository_commit_hook_after_hook(
&self.pool,
hook_id,
hook_owner,
&ctx,
hook_record,
)
.await;
__autumn_pending_heartbeat.cancel();
if let ::core::result::Result::Err(__autumn_error) = __autumn_finalize_result {
::autumn_web::reexports::tracing::warn!(
hook_id = %hook_id,
error = %__autumn_error,
"failed to finalize repository create commit hook after mutation commit"
);
failures.push((chunk[mapped_idx].2, __autumn_error));
} else {
successes.push(record);
}
}
::core::result::Result::Ok(::core::result::Result::Err(__autumn_error)) => {
let __autumn_error_message = ::std::format!("{__autumn_error}");
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
hook_id,
hook_owner,
__autumn_error_message,
)
.await;
__autumn_pending_heartbeat.cancel();
::autumn_web::reexports::tracing::warn!(
hook_id = %hook_id,
error = %__autumn_error,
"after_create hook failed during skip-invalid inserts"
);
failures.push((chunk[mapped_idx].2, __autumn_error));
}
::core::result::Result::Err(__autumn_panic) => {
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
hook_id,
hook_owner,
"after_create panicked",
)
.await;
__autumn_pending_heartbeat.cancel();
::autumn_web::reexports::tracing::warn!(
hook_id = %hook_id,
"after_create hook panicked during skip-invalid inserts"
);
failures.push((chunk[mapped_idx].2, ::autumn_web::AutumnError::internal_server_error_msg("after_create hook panicked")));
}
}
}
}
Err(batch_err) => {
let is_constraint_error = if let ::core::option::Option::Some(diesel_err) = batch_err.downcast_ref::<::autumn_web::reexports::diesel::result::Error>() {
match diesel_err {
::autumn_web::reexports::diesel::result::Error::DatabaseError(kind, _) => {
match kind {
::autumn_web::reexports::diesel::result::DatabaseErrorKind::UniqueViolation |
::autumn_web::reexports::diesel::result::DatabaseErrorKind::ForeignKeyViolation |
::autumn_web::reexports::diesel::result::DatabaseErrorKind::NotNullViolation |
::autumn_web::reexports::diesel::result::DatabaseErrorKind::CheckViolation => true,
_ => false,
}
}
_ => false,
}
} else {
false
};
if !is_constraint_error {
return ::core::result::Result::Err(batch_err);
}
for item in chunk {
let row_res = conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let record = #row_insert_expr
.map_err(::autumn_web::AutumnError::from)?;
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
let __autumn_hook_info = ::autumn_web::__private::enqueue_repository_commit_hook_pending_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"create",
item.1.idempotency_key.as_deref(),
__autumn_commit_hook_discriminator.as_deref(),
&item.1,
&__autumn_commit_hook_record,
)
.await?;
Ok((record, __autumn_hook_info.0, __autumn_hook_info.1, __autumn_commit_hook_record))
}
.scope_boxed()
})
.await;
match row_res {
Ok((record, hook_id, hook_owner, hook_record)) => {
let mut ctx = item.1.clone();
let __autumn_pending_heartbeat =
::autumn_web::__private::start_repository_commit_hook_pending_finalizer_heartbeat(
self.pool.clone(),
hook_id.clone(),
hook_owner.clone(),
);
let __autumn_after_create = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_create(&mut ctx, &record)
)
.await;
match __autumn_after_create {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {
let __autumn_finalize_result = ::autumn_web::__private::finalize_repository_commit_hook_after_hook(
&self.pool,
&hook_id,
&hook_owner,
&ctx,
&hook_record,
)
.await;
__autumn_pending_heartbeat.cancel();
if let ::core::result::Result::Err(__autumn_error) = __autumn_finalize_result {
::autumn_web::reexports::tracing::warn!(
hook_id = %hook_id,
error = %__autumn_error,
"failed to finalize repository create commit hook after mutation commit"
);
failures.push((item.2, __autumn_error));
} else {
successes.push(record);
}
}
::core::result::Result::Ok(::core::result::Result::Err(__autumn_error)) => {
let __autumn_error_message = ::std::format!("{__autumn_error}");
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
&hook_id,
&hook_owner,
__autumn_error_message,
)
.await;
__autumn_pending_heartbeat.cancel();
::autumn_web::reexports::tracing::warn!(
hook_id = %hook_id,
error = %__autumn_error,
"after_create hook failed during skip-invalid inserts"
);
failures.push((item.2, __autumn_error));
}
::core::result::Result::Err(__autumn_panic) => {
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
&hook_id,
&hook_owner,
"after_create panicked",
)
.await;
__autumn_pending_heartbeat.cancel();
::autumn_web::reexports::tracing::warn!(
hook_id = %hook_id,
"after_create hook panicked during skip-invalid inserts"
);
failures.push((item.2, ::autumn_web::AutumnError::internal_server_error_msg("after_create hook panicked")));
}
}
}
Err(err) => {
failures.push((item.2, err));
}
}
}
}
}
offset += chunk.len();
}
::autumn_web::__private::kick_repository_commit_hook_dispatcher(&self.pool);
}
} else {
quote! {
if valid_items.is_empty() {
return Ok((successes, failures));
}
let cols = (&valid_items[0].0).__autumn_column_count() + #tenant_extra;
let chunk_size = if cols == 0 { 1000 } else { (65535usize / cols).min(1000).max(1) };
for chunk in valid_items.chunks(chunk_size) {
let batch_res = conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
(#insert_expr)
.map_err(::autumn_web::AutumnError::from)
}
.scope_boxed()
})
.await;
match batch_res {
Ok(inserted_chunk) => {
let mapped_indices: Vec<usize> = (0..inserted_chunk.len()).collect();
for (idx, record) in inserted_chunk.into_iter().enumerate() {
let mapped_idx = mapped_indices[idx];
let mut ctx = chunk[mapped_idx].1.clone();
let __autumn_after_create = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_create(&mut ctx, &record)
)
.await;
match __autumn_after_create {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {
successes.push(record);
}
::core::result::Result::Ok(::core::result::Result::Err(err)) => {
::autumn_web::reexports::tracing::warn!(
error = %err,
"after_create hook failed during skip-invalid inserts"
);
failures.push((chunk[mapped_idx].2, err));
}
::core::result::Result::Err(_panic) => {
::autumn_web::reexports::tracing::warn!(
"after_create hook panicked during skip-invalid inserts"
);
failures.push((chunk[mapped_idx].2, ::autumn_web::AutumnError::internal_server_error_msg("after_create hook panicked")));
}
}
}
}
Err(batch_err) => {
let is_constraint_error = if let ::core::option::Option::Some(diesel_err) = batch_err.downcast_ref::<::autumn_web::reexports::diesel::result::Error>() {
match diesel_err {
::autumn_web::reexports::diesel::result::Error::DatabaseError(kind, _) => {
match kind {
::autumn_web::reexports::diesel::result::DatabaseErrorKind::UniqueViolation |
::autumn_web::reexports::diesel::result::DatabaseErrorKind::ForeignKeyViolation |
::autumn_web::reexports::diesel::result::DatabaseErrorKind::NotNullViolation |
::autumn_web::reexports::diesel::result::DatabaseErrorKind::CheckViolation => true,
_ => false,
}
}
_ => false,
}
} else {
false
};
if !is_constraint_error {
return ::core::result::Result::Err(batch_err);
}
for item in chunk {
let row_res = conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
#row_insert_expr
.map_err(::autumn_web::AutumnError::from)
}
.scope_boxed()
})
.await;
match row_res {
Ok(record) => {
let mut ctx = item.1.clone();
let __autumn_after_create = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_create(&mut ctx, &record)
)
.await;
match __autumn_after_create {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {
successes.push(record);
}
::core::result::Result::Ok(::core::result::Result::Err(err)) => {
::autumn_web::reexports::tracing::warn!(
error = %err,
"after_create hook failed during skip-invalid inserts"
);
failures.push((item.2, err));
}
::core::result::Result::Err(_panic) => {
::autumn_web::reexports::tracing::warn!(
"after_create hook panicked during skip-invalid inserts"
);
failures.push((item.2, ::autumn_web::AutumnError::internal_server_error_msg("after_create hook panicked")));
}
}
}
Err(err) => {
failures.push((item.2, err));
}
}
}
}
}
}
}
};
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
use ::autumn_web::repository::AutumnColumnCountSpecific as _;
use ::autumn_web::repository::AutumnColumnCountFallback as _;
if new.is_empty() {
return Ok((Vec::new(), Vec::new()));
}
#tenant_id_setup
#register_commit_hooks
let mut conn = self.__autumn_acquire_conn().await?;
let mut successes = Vec::new();
let mut failures = Vec::new();
let mut valid_items = Vec::new();
for (idx, original_item) in new.iter().enumerate() {
let mut item = original_item.clone();
let mut ctx = MutationContext::new(MutationOp::Create);
#idempotency_setup
match self.hooks.before_create(&mut ctx, &mut item).await {
Ok(()) => {
valid_items.push((item, ctx, idx));
}
Err(err) => {
failures.push((idx, err));
}
}
}
#skip_invalid_impl
Ok((successes, failures))
}
};
let update_many_body = {
let draft_ext_trait = format_ident!("{}DraftExt", model_name);
let tenant_id_setup = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let tenant_id = tenant_id.as_ref();
}
} else {
quote! {}
};
let load_expr = if config.tenant_scoped {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().load::<#model_name>(conn).await
} else {
load_query.for_update().load::<#model_name>(conn).await
}
}
} else {
quote! {
load_query.for_update().load::<#model_name>(conn).await
}
};
let tenant_assign = if config.tenant_scoped {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
draft.after.tenant_id = t.clone();
}
}
} else {
quote! {}
};
let update_expr = if config.tenant_scoped {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
::autumn_web::reexports::diesel::update(update_target.filter(#table_ident::tenant_id.eq(t)))
.set(proposed)
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::update(update_target)
.set(proposed)
.get_result::<#model_name>(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::update(update_target)
.set(proposed)
.get_result::<#model_name>(conn)
.await
}
};
let idempotency_setup = if commit_hooks_enabled {
quote! {
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
ctx.set_idempotency_key(__autumn_idempotency.scoped_key());
}
}
} else {
quote! {}
};
let commit_hooks_enqueue_block = if commit_hooks_enabled {
quote! {
let mut hook_records = Vec::new();
for record in &chunk_updated {
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
hook_records.push((__autumn_commit_hook_record, __autumn_commit_hook_discriminator));
}
let hook_inputs: Vec<_> = chunk_updated.iter().enumerate().map(|(idx, _)| {
let global_idx = offset + idx;
let ctx = &contexts[global_idx];
let (ref record_val, ref discriminator) = hook_records[idx];
(
ctx.idempotency_key.clone(),
discriminator.clone(),
ctx,
record_val,
)
}).collect();
let chunk_hook_infos = ::autumn_web::__private::enqueue_repository_commit_hooks_pending_bulk_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"update",
&hook_inputs,
)
.await?;
for (idx, info) in chunk_hook_infos.into_iter().enumerate() {
hook_infos.push((info.0, info.1, hook_records[idx].0.clone()));
}
}
} else {
quote! {}
};
let after_update_hook_block = if commit_hooks_enabled {
quote! {
let (hook_id, hook_owner, hook_record) = &hook_infos[idx];
let __autumn_pending_heartbeat =
::autumn_web::__private::start_repository_commit_hook_pending_finalizer_heartbeat(
self.pool.clone(),
hook_id.clone(),
hook_owner.clone(),
);
let __autumn_after_update = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_update(&mut ctx, record)
)
.await;
match __autumn_after_update {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {
let __autumn_finalize_result = ::autumn_web::__private::finalize_repository_commit_hook_after_hook(
&self.pool,
hook_id,
hook_owner,
&ctx,
hook_record,
)
.await;
__autumn_pending_heartbeat.cancel();
if let ::core::result::Result::Err(__autumn_error) = __autumn_finalize_result {
::autumn_web::reexports::tracing::warn!(
hook_id = %hook_id,
error = %__autumn_error,
"failed to finalize repository update commit hook after mutation commit; failing request closed"
);
if __autumn_first_err.is_none() {
__autumn_first_err = ::core::option::Option::Some(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
}
}
::core::result::Result::Ok(::core::result::Result::Err(__autumn_error)) => {
let __autumn_error_message = ::std::format!("{__autumn_error}");
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
hook_id,
hook_owner,
__autumn_error_message,
)
.await;
__autumn_pending_heartbeat.cancel();
if __autumn_first_err.is_none() {
__autumn_first_err = ::core::option::Option::Some(
::autumn_web::idempotency::__cache_committed_error_response(__autumn_error)
);
}
}
::core::result::Result::Err(__autumn_panic) => {
::autumn_web::__private::mark_repository_commit_hook_after_hook_failed(
&self.pool,
hook_id,
hook_owner,
"after_update panicked",
)
.await;
__autumn_pending_heartbeat.cancel();
if __autumn_first_panic.is_none() {
__autumn_first_panic = ::core::option::Option::Some(__autumn_panic);
}
}
}
}
} else {
quote! {
let __autumn_after_update = ::autumn_web::__private::catch_repository_after_hook_unwind(
self.hooks.after_update(&mut ctx, record)
)
.await;
match __autumn_after_update {
::core::result::Result::Ok(::core::result::Result::Ok(())) => {}
::core::result::Result::Ok(::core::result::Result::Err(__autumn_error)) => {
if __autumn_first_err.is_none() {
__autumn_first_err = ::core::option::Option::Some(__autumn_error);
}
}
::core::result::Result::Err(__autumn_panic) => {
if __autumn_first_panic.is_none() {
__autumn_first_panic = ::core::option::Option::Some(__autumn_panic);
}
}
}
}
};
let kick_dispatcher_block = if commit_hooks_enabled {
quote! {
::autumn_web::__private::kick_repository_commit_hook_dispatcher(&self.pool);
}
} else {
quote! {}
};
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks, UpdateDraft};
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _};
if ids.is_empty() {
return Ok(Vec::new());
}
#tenant_id_setup
let mut conn = self.__autumn_acquire_conn().await?;
let (updated_records, contexts, hook_infos) = conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut current_rows = Vec::new();
for chunk in ids.chunks(1000) {
let load_query = #table_ident::table.filter(#table_ident::id.eq_any(chunk))
.order(#table_ident::id.asc());
let chunk_rows = #load_expr
.map_err(::autumn_web::AutumnError::from)?;
current_rows.extend(chunk_rows);
}
if let ::core::option::Option::Some(expected_version) =
changes.__autumn_lock_version_expected()
{
for current in ¤t_rows {
if let ::core::option::Option::Some(actual_version) =
current.__autumn_lock_version_actual()
{
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id: current.id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
}
}
let mut proposed_rows = Vec::new();
let mut contexts = Vec::new();
for current in ¤t_rows {
let mut ctx = MutationContext::new(MutationOp::Update);
#idempotency_setup
let mut draft = <UpdateDraft<#model_name> as #draft_ext_trait>::from_patch(current, changes)?;
#tenant_assign
self.hooks.before_update(&mut ctx, &mut draft).await?;
#tenant_assign
proposed_rows.push(draft.into_after());
contexts.push(ctx);
}
let mut updated_records = Vec::new();
let mut hook_infos: ::std::vec::Vec<(::std::string::String, ::std::string::String, ::serde_json::Value)> = ::std::vec::Vec::new();
let mut offset = 0;
for chunk in proposed_rows.chunks(1000) {
let mut chunk_updated = Vec::new();
for proposed in chunk {
let update_target = #table_ident::table.find(proposed.id);
let updated = #update_expr
.map_err(::autumn_web::AutumnError::from)?;
chunk_updated.push(updated);
}
#commit_hooks_enqueue_block
updated_records.extend(chunk_updated);
offset += chunk.len();
}
Ok((updated_records, contexts, hook_infos))
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
let mut __autumn_first_err: ::core::option::Option<::autumn_web::AutumnError> = ::core::option::Option::None;
let mut __autumn_first_panic: ::core::option::Option<::std::boxed::Box<dyn ::core::any::Any + ::core::marker::Send>> = ::core::option::Option::None;
for (idx, record) in updated_records.iter().enumerate() {
let mut ctx = contexts[idx].clone();
#after_update_hook_block
}
#kick_dispatcher_block
if let ::core::option::Option::Some(err) = __autumn_first_err {
return ::core::result::Result::Err(err);
}
if let ::core::option::Option::Some(panic_val) = __autumn_first_panic {
::std::panic::resume_unwind(panic_val);
}
Ok(updated_records)
}
};
let delete_many_body = {
let tenant_id_setup = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let tenant_id = tenant_id.as_ref();
}
} else {
quote! {}
};
let load_expr = if config.tenant_scoped {
if config.soft_delete {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).filter(#table_ident::deleted_at.is_null()).for_update().load::<#model_name>(conn).await
} else {
load_query.filter(#table_ident::deleted_at.is_null()).for_update().load::<#model_name>(conn).await
}
}
} else {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().load::<#model_name>(conn).await
} else {
load_query.for_update().load::<#model_name>(conn).await
}
}
}
} else {
if config.soft_delete {
quote! {
load_query.filter(#table_ident::deleted_at.is_null()).for_update().load::<#model_name>(conn).await
}
} else {
quote! {
load_query.for_update().load::<#model_name>(conn).await
}
}
};
let delete_expr = if config.soft_delete {
if config.tenant_scoped {
quote! {
let query = #table_ident::table.filter(#table_ident::id.eq_any(chunk)).filter(#table_ident::deleted_at.is_null());
if let ::core::option::Option::Some(t) = tenant_id {
::autumn_web::reexports::diesel::update(query.filter(#table_ident::tenant_id.eq(t)))
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(conn)
.await
} else {
::autumn_web::reexports::diesel::update(query)
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::update(#table_ident::table.filter(#table_ident::id.eq_any(chunk)).filter(#table_ident::deleted_at.is_null()))
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(conn)
.await
}
}
} else {
if config.tenant_scoped {
quote! {
let query = #table_ident::table.filter(#table_ident::id.eq_any(chunk));
if let ::core::option::Option::Some(t) = tenant_id {
::autumn_web::reexports::diesel::delete(query.filter(#table_ident::tenant_id.eq(t)))
.execute(conn)
.await
} else {
::autumn_web::reexports::diesel::delete(query)
.execute(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::delete(#table_ident::table.filter(#table_ident::id.eq_any(chunk)))
.execute(conn)
.await
}
}
};
let delete_returning_expr = if config.soft_delete {
if config.tenant_scoped {
quote! {
let query = #table_ident::table.filter(#table_ident::id.eq_any(chunk)).filter(#table_ident::deleted_at.is_null());
if let ::core::option::Option::Some(t) = tenant_id {
::autumn_web::reexports::diesel::update(query.filter(#table_ident::tenant_id.eq(t)))
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
} else {
::autumn_web::reexports::diesel::update(query)
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::update(#table_ident::table.filter(#table_ident::id.eq_any(chunk)).filter(#table_ident::deleted_at.is_null()))
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
}
}
} else {
if config.tenant_scoped {
quote! {
let query = #table_ident::table.filter(#table_ident::id.eq_any(chunk));
if let ::core::option::Option::Some(t) = tenant_id {
::autumn_web::reexports::diesel::delete(query.filter(#table_ident::tenant_id.eq(t)))
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
} else {
::autumn_web::reexports::diesel::delete(query)
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::delete(#table_ident::table.filter(#table_ident::id.eq_any(chunk)))
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
}
}
};
let vh_delete_write = if config.versioned {
let vh = vh_insert_ts(
table_name,
"delete",
false,
"e! { r },
None,
"e! { conn },
model_name,
);
quote! {
for r in &__vh_deleted_records {
#vh
}
}
} else {
quote! {}
};
let delete_execution = if config.versioned {
quote! {
let mut __vh_actually_deleted: ::std::collections::HashSet<i64> = ::std::collections::HashSet::new();
for chunk in ids.chunks(1000) {
let chunk_deleted_ids = #delete_returning_expr
.map_err(::autumn_web::AutumnError::from)?;
__vh_actually_deleted.extend(chunk_deleted_ids);
}
let mut __vh_deleted_records = current_rows.clone();
__vh_deleted_records.retain(|r| __vh_actually_deleted.contains(&r.id));
#vh_delete_write
}
} else {
quote! {
for chunk in ids.chunks(1000) {
#delete_expr
.map_err(::autumn_web::AutumnError::from)?;
}
}
};
let idempotency_setup = if commit_hooks_enabled {
quote! {
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
ctx.set_idempotency_key(__autumn_idempotency.scoped_key());
}
}
} else {
quote! {}
};
let delete_commit_hook_setup = if commit_hooks_enabled {
quote! {
let mut __autumn_commit_hook_discriminator: ::core::option::Option<::std::string::String> =
::core::option::Option::None;
if let ::core::option::Option::Some(__autumn_idempotency) = &self.idempotency {
__autumn_commit_hook_discriminator =
::core::option::Option::Some(__autumn_idempotency.next_mutation_discriminator());
}
let __autumn_commit_hook_record = record.__autumn_commit_hook_to_value()?;
::autumn_web::__private::enqueue_repository_commit_hook_on_conn(
conn,
Self::__autumn_repository_commit_hook_key(),
"delete",
ctx.idempotency_key.as_deref(),
__autumn_commit_hook_discriminator.as_deref(),
&ctx,
&__autumn_commit_hook_record,
)
.await?;
}
} else {
quote! {}
};
let kick_dispatcher = if commit_hooks_enabled {
quote! {
::autumn_web::__private::kick_repository_commit_hook_dispatcher(&self.pool);
}
} else {
quote! {}
};
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::hooks::{MutationContext, MutationOp, MutationHooks};
if ids.is_empty() {
return Ok(());
}
#tenant_id_setup
let mut conn = self.__autumn_acquire_conn().await?;
let __now = ::autumn_web::reexports::chrono::Utc::now().naive_utc();
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut current_rows = Vec::new();
for chunk in ids.chunks(1000) {
let load_query = #table_ident::table.filter(#table_ident::id.eq_any(chunk))
.order(#table_ident::id.asc());
let chunk_rows = #load_expr
.map_err(::autumn_web::AutumnError::from)?;
current_rows.extend(chunk_rows);
}
for record in ¤t_rows {
let mut ctx = MutationContext::new(MutationOp::Delete);
#idempotency_setup
self.hooks.before_delete(&mut ctx, record).await?;
#delete_commit_hook_setup
}
#delete_execution
Ok(())
}
.scope_boxed()
})
.await?;
::core::mem::drop(conn);
#kick_dispatcher
Ok(())
}
};
let upsert_many_body = quote! {
unreachable!("upsert_many is not available when hooks are configured")
};
(
struct_fields,
clone_impl,
extractor_init,
save_body,
update_body,
delete_body,
hook_support_methods,
hook_inventory_registration,
save_many_body,
save_many_skip_invalid_body,
update_many_body,
delete_many_body,
upsert_many_body,
)
} else {
let struct_fields = quote! {
pool: ::autumn_web::reexports::diesel_async::pooled_connection::deadpool::Pool<
::autumn_web::reexports::diesel_async::AsyncPgConnection,
>,
#tenant_struct_field
__autumn_read_route: ::autumn_web::repository::ReadRoute,
__autumn_statement_timeout_ms: u64,
__autumn_slow_threshold: ::std::time::Duration,
__autumn_route: ::std::option::Option<::std::string::String>,
};
let clone_impl = quote! {
impl ::core::clone::Clone for #pg_name {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
#tenant_clone_field
__autumn_read_route: self.__autumn_read_route.clone(),
__autumn_statement_timeout_ms: self.__autumn_statement_timeout_ms,
__autumn_slow_threshold: self.__autumn_slow_threshold,
__autumn_route: self.__autumn_route.clone(),
}
}
}
};
let timeout_route_init = quote! {
use ::autumn_web::db::DbState as _;
const __AUTUMN_PG_TIMEOUT_MAX_MS: u64 = i32::MAX as u64;
let __autumn_timeout_ms: u64 = _parts
.extensions
.get::<::autumn_web::db::StatementTimeout>()
.map(|t| ::std::convert::TryFrom::try_from(t.0.as_millis()).unwrap_or(u64::MAX))
.or_else(|| state.statement_timeout().map(|d| ::std::convert::TryFrom::try_from(d.as_millis()).unwrap_or(u64::MAX)))
.unwrap_or(0u64)
.min(__AUTUMN_PG_TIMEOUT_MAX_MS);
let __autumn_slow_threshold = state.slow_query_threshold();
let __autumn_route: ::std::option::Option<::std::string::String> = _parts
.extensions
.get::<::autumn_web::reexports::axum::extract::MatchedPath>()
.map(|p| p.as_str().to_owned());
#read_route_init
};
let extractor_init = quote! {
#timeout_route_init
Ok(#pg_name {
pool,
#tenant_init_field
__autumn_read_route,
__autumn_statement_timeout_ms: __autumn_timeout_ms,
__autumn_slow_threshold,
__autumn_route,
})
};
let save_body = if config.tenant_scoped && config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"insert",
false,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
let tenant_id = if self.across_tenants {
::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| async move {
let record = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(::autumn_web::tenancy::TenantInsertable::tenant_values(new.clone(), t))
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(new.clone())
.get_result::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
#vh_insert
Ok(record)
}.scope_boxed())
.await
}
} else if config.tenant_scoped {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let tenant_id = if self.across_tenants {
::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let mut conn = self.__autumn_acquire_conn().await?;
if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(::autumn_web::tenancy::TenantInsertable::tenant_values(new.clone(), t))
.get_result::<#model_name>(&mut conn)
.await
} else {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(new.clone())
.get_result::<#model_name>(&mut conn)
.await
}
.map_err(::autumn_web::AutumnError::from)
}
} else if config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"insert",
false,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| async move {
let record = ::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(new.clone())
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
#vh_insert
Ok(record)
}.scope_boxed())
.await
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_conn().await?;
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(new.clone())
.get_result::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
};
let update_body = if config.tenant_scoped && config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"update",
false,
"e! { record },
Some("e! { current }),
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _, CanSetTenantId as _};
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let load_query = #table_ident::table.find(id);
let current = if let ::core::option::Option::Some(expected_version) =
changes.__autumn_lock_version_expected()
{
let c = if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
if let ::core::option::Option::Some(actual_version) = c.__autumn_lock_version_actual() {
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
c
} else {
if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?
};
let mut diesel_changeset = changes.__to_changeset();
if let ::core::option::Option::Some(ref t) = tenant_id {
diesel_changeset.set_tenant_id(t.clone());
}
let update_target = #table_ident::table.find(id);
let record = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::update(update_target.filter(#table_ident::tenant_id.eq(t)))
.set(diesel_changeset)
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::update(update_target)
.set(diesel_changeset)
.get_result::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
#vh_insert
Ok(record)
}
.scope_boxed()
})
.await
}
} else if config.tenant_scoped {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _};
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let mut conn = self.__autumn_acquire_conn().await?;
if let ::core::option::Option::Some(expected_version) =
changes.__autumn_lock_version_expected()
{
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let load_query = #table_ident::table.find(id);
let current = if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
if let ::core::option::Option::Some(actual_version) =
current.__autumn_lock_version_actual()
{
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
let mut diesel_changeset = changes.__to_changeset();
if let ::core::option::Option::Some(ref t) = tenant_id {
use ::autumn_web::repository::CanSetTenantId as _;
diesel_changeset.set_tenant_id(t.clone());
}
let update_target = #table_ident::table.find(id);
if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::update(update_target.filter(#table_ident::tenant_id.eq(t)))
.set(diesel_changeset)
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::update(update_target)
.set(diesel_changeset)
.get_result::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)
}
.scope_boxed()
})
.await
} else {
let mut diesel_changeset = changes.__to_changeset();
if let ::core::option::Option::Some(ref t) = tenant_id {
use ::autumn_web::repository::CanSetTenantId as _;
diesel_changeset.set_tenant_id(t.clone());
}
let update_target = #table_ident::table.find(id);
if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::update(update_target.filter(#table_ident::tenant_id.eq(t)))
.set(diesel_changeset)
.get_result::<#model_name>(&mut conn)
.await
} else {
::autumn_web::reexports::diesel::update(update_target)
.set(diesel_changeset)
.get_result::<#model_name>(&mut conn)
.await
}
.map_err(::autumn_web::AutumnError::from)
}
}
} else if config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"update",
false,
"e! { record },
Some("e! { current }),
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _};
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let load_query = #table_ident::table.find(id);
let current = if let ::core::option::Option::Some(expected_version) =
changes.__autumn_lock_version_expected()
{
let c = load_query.for_update().first::<#model_name>(conn).await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
if let ::core::option::Option::Some(actual_version) =
c.__autumn_lock_version_actual()
{
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
c
} else {
load_query.for_update().first::<#model_name>(conn).await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?
};
let diesel_changeset = changes.__to_changeset();
let update_target = #table_ident::table.find(id);
let record = ::autumn_web::reexports::diesel::update(update_target)
.set(diesel_changeset)
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
#vh_insert
Ok(record)
}
.scope_boxed()
})
.await
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _};
let mut conn = self.__autumn_acquire_conn().await?;
if let ::core::option::Option::Some(expected_version) =
changes.__autumn_lock_version_expected()
{
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let load_query = #table_ident::table.find(id);
let current = load_query.for_update().first::<#model_name>(conn).await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
if let ::core::option::Option::Some(actual_version) =
current.__autumn_lock_version_actual()
{
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
let diesel_changeset = changes.__to_changeset();
let update_target = #table_ident::table.find(id);
::autumn_web::reexports::diesel::update(update_target)
.set(diesel_changeset)
.get_result::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
.scope_boxed()
})
.await
} else {
let diesel_changeset = changes.__to_changeset();
let update_target = #table_ident::table.find(id);
::autumn_web::reexports::diesel::update(update_target)
.set(diesel_changeset)
.get_result::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
}
};
let delete_body = if config.tenant_scoped {
let tenant_id_setup = quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
};
if config.soft_delete && config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"delete",
false,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
#tenant_id_setup
let __now = ::autumn_web::reexports::chrono::Utc::now().naive_utc();
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| async move {
let load_query = #table_ident::table.find(id).filter(#table_ident::deleted_at.is_null());
let record = if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
let delete_query = #table_ident::table.find(id).filter(#table_ident::deleted_at.is_null());
let __count = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::update(delete_query.filter(#table_ident::tenant_id.eq(t)))
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(conn)
.await
} else {
::autumn_web::reexports::diesel::update(delete_query)
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
#vh_insert
Ok(())
}.scope_boxed())
.await
}
} else if config.soft_delete {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
#tenant_id_setup
let __now = ::autumn_web::reexports::chrono::Utc::now().naive_utc();
let mut conn = self.__autumn_acquire_conn().await?;
let delete_query = #table_ident::table.find(id).filter(#table_ident::deleted_at.is_null());
let __count = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::update(delete_query.filter(#table_ident::tenant_id.eq(t)))
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(&mut conn)
.await
} else {
::autumn_web::reexports::diesel::update(delete_query)
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(&mut conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
Ok(())
}
} else if config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"delete",
false,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
#tenant_id_setup
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| async move {
let load_query = #table_ident::table.find(id);
let record = if let ::core::option::Option::Some(ref t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().first::<#model_name>(conn).await
} else {
load_query.for_update().first::<#model_name>(conn).await
}
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
let delete_query = #table_ident::table.find(id);
let __count = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::delete(delete_query.filter(#table_ident::tenant_id.eq(t)))
.execute(conn)
.await
} else {
::autumn_web::reexports::diesel::delete(delete_query)
.execute(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
#vh_insert
Ok(())
}.scope_boxed())
.await
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
#tenant_id_setup
let mut conn = self.__autumn_acquire_conn().await?;
let delete_query = #table_ident::table.find(id);
let __count = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::delete(delete_query.filter(#table_ident::tenant_id.eq(t)))
.execute(&mut conn)
.await
} else {
::autumn_web::reexports::diesel::delete(delete_query)
.execute(&mut conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
Ok(())
}
}
} else if config.soft_delete {
if config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"delete",
false,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
let __now = ::autumn_web::reexports::chrono::Utc::now().naive_utc();
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| async move {
let record = #table_ident::table.find(id)
.filter(#table_ident::deleted_at.is_null())
.for_update()
.first::<#model_name>(conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
let __count = ::autumn_web::reexports::diesel::update(
#table_ident::table.find(id).filter(#table_ident::deleted_at.is_null())
)
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
#vh_insert
Ok(())
}.scope_boxed())
.await
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let __now = ::autumn_web::reexports::chrono::Utc::now().naive_utc();
let mut conn = self.__autumn_acquire_conn().await?;
let delete_query = #table_ident::table.find(id).filter(#table_ident::deleted_at.is_null());
let __count = ::autumn_web::reexports::diesel::update(delete_query)
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
Ok(())
}
}
} else if config.versioned {
let vh_insert = vh_insert_ts(
table_name,
"delete",
false,
"e! { record },
None,
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| async move {
let record = #table_ident::table.find(id)
.for_update()
.first::<#model_name>(conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
let __count = ::autumn_web::reexports::diesel::delete(#table_ident::table.find(id))
.execute(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
#vh_insert
Ok(())
}.scope_boxed())
.await
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_conn().await?;
let delete_query = #table_ident::table.find(id);
let __count = ::autumn_web::reexports::diesel::delete(delete_query)
.execute(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
Ok(())
}
};
let save_many_body = if config.tenant_scoped && config.versioned {
let vh_r = vh_insert_ts(
table_name,
"insert",
false,
"e! { r },
None,
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::repository::AutumnColumnCountSpecific as _;
use ::autumn_web::repository::AutumnColumnCountFallback as _;
if new.is_empty() {
return Ok(Vec::new());
}
let tenant_id = if self.across_tenants {
::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let tenant_id = tenant_id.as_ref();
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut inserted = Vec::new();
let cols = (&new[0]).__autumn_column_count() + #tenant_extra;
let chunk_size = if cols == 0 { 1000 } else { (65535usize / cols).min(1000).max(1) };
for chunk in new.chunks(chunk_size) {
let chunk_inserted = if let ::core::option::Option::Some(t) = tenant_id {
let values: Vec<_> = chunk.iter().cloned().map(|item| ::autumn_web::tenancy::TenantInsertable::tenant_values(item, t)).collect();
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(values)
.get_results::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(chunk.to_vec())
.get_results::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
for r in &chunk_inserted {
#vh_r
}
inserted.extend(chunk_inserted);
}
Ok(inserted)
}
.scope_boxed()
})
.await
}
} else if config.tenant_scoped {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::repository::AutumnColumnCountSpecific as _;
use ::autumn_web::repository::AutumnColumnCountFallback as _;
if new.is_empty() {
return Ok(Vec::new());
}
let tenant_id = if self.across_tenants {
::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let tenant_id = tenant_id.as_ref();
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut inserted = Vec::new();
let cols = (&new[0]).__autumn_column_count() + #tenant_extra;
let chunk_size = if cols == 0 { 1000 } else { (65535usize / cols).min(1000).max(1) };
for chunk in new.chunks(chunk_size) {
let chunk_inserted = if let ::core::option::Option::Some(t) = tenant_id {
let values: Vec<_> = chunk.iter().cloned().map(|item| ::autumn_web::tenancy::TenantInsertable::tenant_values(item, t)).collect();
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(values)
.get_results::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(chunk.to_vec())
.get_results::<#model_name>(conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
inserted.extend(chunk_inserted);
}
Ok(inserted)
}
.scope_boxed()
})
.await
}
} else if config.versioned {
let vh_r = vh_insert_ts(
table_name,
"insert",
false,
"e! { r },
None,
"e! { conn },
model_name,
);
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::repository::AutumnColumnCountSpecific as _;
use ::autumn_web::repository::AutumnColumnCountFallback as _;
if new.is_empty() {
return Ok(Vec::new());
}
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut inserted = Vec::new();
let cols = (&new[0]).__autumn_column_count() + #tenant_extra;
let chunk_size = if cols == 0 { 1000 } else { (65535usize / cols).min(1000).max(1) };
for chunk in new.chunks(chunk_size) {
let chunk_inserted = ::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(chunk.to_vec())
.get_results::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
for r in &chunk_inserted {
#vh_r
}
inserted.extend(chunk_inserted);
}
Ok(inserted)
}
.scope_boxed()
})
.await
}
} else {
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::repository::AutumnColumnCountSpecific as _;
use ::autumn_web::repository::AutumnColumnCountFallback as _;
if new.is_empty() {
return Ok(Vec::new());
}
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut inserted = Vec::new();
let cols = (&new[0]).__autumn_column_count() + #tenant_extra;
let chunk_size = if cols == 0 { 1000 } else { (65535usize / cols).min(1000).max(1) };
for chunk in new.chunks(chunk_size) {
let chunk_inserted = ::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(chunk.to_vec())
.get_results::<#model_name>(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
inserted.extend(chunk_inserted);
}
Ok(inserted)
}
.scope_boxed()
})
.await
}
};
let save_many_skip_invalid_body = {
let vh_skip_batch = if config.versioned {
let vh = vh_insert_ts(
table_name,
"insert",
false,
"e! { r },
None,
"e! { conn },
model_name,
);
quote! { for r in &results { #vh } }
} else {
quote! {}
};
let vh_skip_row = if config.versioned {
let vh = vh_insert_ts(
table_name,
"insert",
false,
"e! { model },
None,
"e! { conn },
model_name,
);
quote! { #vh }
} else {
quote! {}
};
let tenant_id_setup = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let tenant_id = tenant_id.as_ref();
}
} else {
quote! {}
};
let insert_expr_conn = if config.tenant_scoped {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
let values: Vec<_> = chunk.iter().cloned().map(|item| ::autumn_web::tenancy::TenantInsertable::tenant_values(item, t)).collect();
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(values)
.get_results::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(chunk.to_vec())
.get_results::<#model_name>(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(chunk.to_vec())
.get_results::<#model_name>(conn)
.await
}
};
let row_insert_expr_conn = if config.tenant_scoped {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
let values = ::autumn_web::tenancy::TenantInsertable::tenant_values(item.clone(), t);
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(values)
.get_result::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(item.clone())
.get_result::<#model_name>(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::insert_into(#table_ident::table)
.values(item.clone())
.get_result::<#model_name>(conn)
.await
}
};
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::repository::AutumnColumnCountSpecific as _;
use ::autumn_web::repository::AutumnColumnCountFallback as _;
if new.is_empty() {
return Ok((Vec::new(), Vec::new()));
}
#tenant_id_setup
let mut conn = self.__autumn_acquire_conn().await?;
let mut successes = Vec::new();
let mut failures = Vec::new();
let mut offset = 0;
let cols = (&new[0]).__autumn_column_count() + #tenant_extra;
let chunk_size = if cols == 0 { 1000 } else { (65535usize / cols).min(1000).max(1) };
for chunk in new.chunks(chunk_size) {
let batch_res = conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let results = (#insert_expr_conn)
.map_err(::autumn_web::AutumnError::from)?;
#vh_skip_batch
Ok(results)
}
.scope_boxed()
})
.await;
match batch_res {
Ok(results) => {
successes.extend(results);
}
Err(batch_err) => {
let is_constraint_error = if let ::core::option::Option::Some(diesel_err) = batch_err.downcast_ref::<::autumn_web::reexports::diesel::result::Error>() {
match diesel_err {
::autumn_web::reexports::diesel::result::Error::DatabaseError(kind, _) => {
match kind {
::autumn_web::reexports::diesel::result::DatabaseErrorKind::UniqueViolation |
::autumn_web::reexports::diesel::result::DatabaseErrorKind::ForeignKeyViolation |
::autumn_web::reexports::diesel::result::DatabaseErrorKind::NotNullViolation |
::autumn_web::reexports::diesel::result::DatabaseErrorKind::CheckViolation => true,
_ => false,
}
}
_ => false,
}
} else {
false
};
if !is_constraint_error {
return ::core::result::Result::Err(batch_err);
}
for (idx, item) in chunk.iter().enumerate() {
let global_idx = offset + idx;
let res = conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let model = (#row_insert_expr_conn)
.map_err(::autumn_web::AutumnError::from)?;
#vh_skip_row
Ok(model)
}
.scope_boxed()
})
.await;
match res {
Ok(model) => successes.push(model),
Err(err) => failures.push((global_idx, err)),
}
}
}
}
offset += chunk.len();
}
Ok((successes, failures))
}
};
let update_many_body = {
let vh_update_pair = if config.versioned {
let vh = vh_insert_ts(
table_name,
"update",
false,
"e! { after_rec },
Some("e! { before_rec }),
"e! { conn },
model_name,
);
quote! {
for after_rec in &chunk_updated {
if let ::core::option::Option::Some(before_rec) = __vh_before_map.get(&after_rec.id) {
#vh
}
}
}
} else {
quote! {}
};
let vh_build_before_map_from_current = if config.versioned {
quote! {
let __vh_before_map: ::std::collections::HashMap<i64, #model_name> =
current_rows.iter().map(|r| (r.id, r.clone())).collect();
}
} else {
quote! {}
};
let vh_load_before_map_no_lock_expr = if config.tenant_scoped {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().load::<#model_name>(conn).await
} else {
load_query.for_update().load::<#model_name>(conn).await
}
}
} else {
quote! {
load_query.for_update().load::<#model_name>(conn).await
}
};
let vh_load_before_map_no_lock = if config.versioned {
quote! {
let mut __vh_before_map = ::std::collections::HashMap::<i64, #model_name>::new();
for chunk in ids.chunks(1000) {
let load_query = #table_ident::table.filter(#table_ident::id.eq_any(chunk))
.order(#table_ident::id.asc());
let chunk_rows = #vh_load_before_map_no_lock_expr
.map_err(::autumn_web::AutumnError::from)?;
for row in chunk_rows {
__vh_before_map.insert(row.id, row);
}
}
}
} else {
quote! {}
};
let tenant_id_setup = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let tenant_id = tenant_id.as_ref();
}
} else {
quote! {}
};
let set_tenant_expr = quote! {};
let load_expr = if config.tenant_scoped {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().load::<#model_name>(conn).await
} else {
load_query.for_update().load::<#model_name>(conn).await
}
}
} else {
quote! {
load_query.for_update().load::<#model_name>(conn).await
}
};
let update_expr_conn = if config.tenant_scoped {
quote! {
if let ::core::option::Option::Some(t) = tenant_id {
let mut diesel_changeset = changes.__to_changeset();
{
use ::autumn_web::repository::CanSetTenantId as _;
diesel_changeset.set_tenant_id(t.clone());
}
::autumn_web::reexports::diesel::update(#table_ident::table.filter(#table_ident::id.eq_any(chunk)).filter(#table_ident::tenant_id.eq(t)))
.set(diesel_changeset)
.get_results::<#model_name>(conn)
.await
} else {
::autumn_web::reexports::diesel::update(#table_ident::table.filter(#table_ident::id.eq_any(chunk)))
.set(changes.__to_changeset())
.get_results::<#model_name>(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::update(#table_ident::table.filter(#table_ident::id.eq_any(chunk)))
.set(changes.__to_changeset())
.get_results::<#model_name>(conn)
.await
}
};
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::repository::{AutumnLockVersionModelExt as _, AutumnLockVersionUpdateExt as _};
if ids.is_empty() {
return Ok(Vec::new());
}
let __ids_deduped: Vec<i64> = {
let mut __seen = ::std::collections::HashSet::new();
ids.iter().filter(|&&id| __seen.insert(id)).copied().collect()
};
let ids: &[i64] = &__ids_deduped;
#tenant_id_setup
#set_tenant_expr
let mut conn = self.__autumn_acquire_conn().await?;
if let ::core::option::Option::Some(expected_version) = changes.__autumn_lock_version_expected() {
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut current_rows = Vec::new();
for chunk in ids.chunks(1000) {
let load_query = #table_ident::table.filter(#table_ident::id.eq_any(chunk))
.order(#table_ident::id.asc());
let chunk_rows = #load_expr
.map_err(::autumn_web::AutumnError::from)?;
current_rows.extend(chunk_rows);
}
for current in ¤t_rows {
if let ::core::option::Option::Some(actual_version) = current.__autumn_lock_version_actual() {
if actual_version != expected_version {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id: current.id,
expected_version,
actual_version: ::core::option::Option::Some(actual_version),
},
));
}
}
}
#vh_build_before_map_from_current
let mut updated = Vec::new();
for chunk in ids.chunks(1000) {
let chunk_updated = #update_expr_conn
.map_err(::autumn_web::AutumnError::from)?;
#vh_update_pair
updated.extend(chunk_updated);
}
Ok(updated)
}
.scope_boxed()
})
.await
} else {
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
#vh_load_before_map_no_lock
let mut updated = Vec::new();
for chunk in ids.chunks(1000) {
let chunk_updated = #update_expr_conn
.map_err(::autumn_web::AutumnError::from)?;
#vh_update_pair
updated.extend(chunk_updated);
}
Ok(updated)
}
.scope_boxed()
})
.await
}
}
};
let delete_many_body = {
let vh_delete_load_before = if config.versioned {
let soft_delete_filter = if config.soft_delete {
quote! { .filter(#table_ident::deleted_at.is_null()) }
} else {
quote! {}
};
let load_chunk = if config.tenant_scoped {
quote! {
let load_query = #table_ident::table
.filter(#table_ident::id.eq_any(chunk))
#soft_delete_filter;
let chunk_rows = if let ::core::option::Option::Some(t) = tenant_id {
load_query.filter(#table_ident::tenant_id.eq(t)).for_update().load::<#model_name>(conn).await
} else {
load_query.for_update().load::<#model_name>(conn).await
}
.map_err(::autumn_web::AutumnError::from)?;
}
} else {
quote! {
let load_query = #table_ident::table
.filter(#table_ident::id.eq_any(chunk))
#soft_delete_filter;
let chunk_rows = load_query.for_update().load::<#model_name>(conn).await
.map_err(::autumn_web::AutumnError::from)?;
}
};
quote! {
let __vh_unique_ids: Vec<i64> = {
let mut __seen = ::std::collections::HashSet::new();
ids.iter().filter(|&&id| __seen.insert(id)).copied().collect()
};
let mut __vh_deleted_records: Vec<#model_name> = Vec::new();
for chunk in __vh_unique_ids.chunks(1000) {
#load_chunk
__vh_deleted_records.extend(chunk_rows);
}
}
} else {
quote! {}
};
let vh_delete_write = if config.versioned {
let vh = vh_insert_ts(
table_name,
"delete",
false,
"e! { r },
None,
"e! { conn },
model_name,
);
quote! {
for r in &__vh_deleted_records {
#vh
}
}
} else {
quote! {}
};
let tenant_id_setup = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let tenant_id = tenant_id.as_ref();
}
} else {
quote! {}
};
let (delete_loop, vh_delete_filter) = if config.versioned {
let delete_returning_expr = if config.soft_delete {
if config.tenant_scoped {
quote! {
{
let query = #table_ident::table.filter(#table_ident::id.eq_any(chunk)).filter(#table_ident::deleted_at.is_null());
if let ::core::option::Option::Some(t) = tenant_id {
::autumn_web::reexports::diesel::update(query.filter(#table_ident::tenant_id.eq(t)))
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
} else {
::autumn_web::reexports::diesel::update(query)
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
}
}
}
} else {
quote! {
::autumn_web::reexports::diesel::update(#table_ident::table.filter(#table_ident::id.eq_any(chunk)).filter(#table_ident::deleted_at.is_null()))
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
}
}
} else if config.tenant_scoped {
quote! {
{
let query = #table_ident::table.filter(#table_ident::id.eq_any(chunk));
if let ::core::option::Option::Some(t) = tenant_id {
::autumn_web::reexports::diesel::delete(query.filter(#table_ident::tenant_id.eq(t)))
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
} else {
::autumn_web::reexports::diesel::delete(query)
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
}
}
}
} else {
quote! {
::autumn_web::reexports::diesel::delete(#table_ident::table.filter(#table_ident::id.eq_any(chunk)))
.returning(#table_ident::id)
.get_results::<i64>(conn)
.await
}
};
let loop_ts = quote! {
let mut __vh_actually_deleted: ::std::collections::HashSet<i64> = ::std::collections::HashSet::new();
for chunk in ids.chunks(1000) {
let chunk_deleted_ids = #delete_returning_expr
.map_err(::autumn_web::AutumnError::from)?;
__vh_actually_deleted.extend(chunk_deleted_ids);
}
};
let filter_ts = quote! {
__vh_deleted_records.retain(|r| __vh_actually_deleted.contains(&r.id));
};
(loop_ts, filter_ts)
} else {
let delete_expr = if config.soft_delete {
if config.tenant_scoped {
quote! {
let query = #table_ident::table.filter(#table_ident::id.eq_any(chunk)).filter(#table_ident::deleted_at.is_null());
if let ::core::option::Option::Some(t) = tenant_id {
::autumn_web::reexports::diesel::update(query.filter(#table_ident::tenant_id.eq(t)))
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(conn)
.await
} else {
::autumn_web::reexports::diesel::update(query)
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::update(#table_ident::table.filter(#table_ident::id.eq_any(chunk)).filter(#table_ident::deleted_at.is_null()))
.set(#table_ident::deleted_at.eq(::core::option::Option::Some(__now)))
.execute(conn)
.await
}
}
} else if config.tenant_scoped {
quote! {
let query = #table_ident::table.filter(#table_ident::id.eq_any(chunk));
if let ::core::option::Option::Some(t) = tenant_id {
::autumn_web::reexports::diesel::delete(query.filter(#table_ident::tenant_id.eq(t)))
.execute(conn)
.await
} else {
::autumn_web::reexports::diesel::delete(query)
.execute(conn)
.await
}
}
} else {
quote! {
::autumn_web::reexports::diesel::delete(#table_ident::table.filter(#table_ident::id.eq_any(chunk)))
.execute(conn)
.await
}
};
let loop_ts = quote! {
for chunk in ids.chunks(1000) {
#delete_expr
.map_err(::autumn_web::AutumnError::from)?;
}
};
(loop_ts, quote! {})
};
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
if ids.is_empty() {
return Ok(());
}
#tenant_id_setup
let mut conn = self.__autumn_acquire_conn().await?;
let __now = ::autumn_web::reexports::chrono::Utc::now().naive_utc();
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
#vh_delete_load_before
#delete_loop
#vh_delete_filter
#vh_delete_write
Ok(())
}
.scope_boxed()
})
.await
}
};
let upsert_many_body = {
let vh_upsert_write = if config.versioned {
let vh_ins = vh_insert_ts(
table_name,
"insert",
false,
"e! { r },
None,
"e! { conn },
model_name,
);
let vh_upd = vh_insert_ts(
table_name,
"update",
false,
"e! { r },
Some("e! { before_rec }),
"e! { conn },
model_name,
);
quote! {
for r in &chunk_upserted {
if let ::core::option::Option::Some(before_rec) = __vh_before_map.get(&r.id) {
#vh_upd
} else {
#vh_ins
}
}
}
} else {
quote! {}
};
let vh_upsert_before_collect = if config.versioned {
quote! {
let __vh_before_map: ::std::collections::HashMap<i64, #model_name> =
existing_rows.iter().map(|r| (r.id, r.clone())).collect();
}
} else {
quote! {}
};
let vh_upsert_lock_keys = if config.versioned {
let table_name = table_name.clone();
quote! {
let mut __autumn_upsert_lock_ids: Vec<_> =
records.iter().map(|r| r.id).collect();
__autumn_upsert_lock_ids.sort_unstable();
__autumn_upsert_lock_ids.dedup();
for __autumn_upsert_lock_id in __autumn_upsert_lock_ids {
let __autumn_upsert_lock_key =
::autumn_web::repository::repository_upsert_advisory_lock_key(
#table_name,
__autumn_upsert_lock_id,
);
::autumn_web::reexports::diesel::sql_query("SELECT pg_advisory_xact_lock($1)")
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(
__autumn_upsert_lock_key,
)
.execute(conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
}
}
} else {
quote! {}
};
let tenant_id_setup = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let tenant_id = tenant_id.as_ref();
let mut records = records.to_vec();
if let ::core::option::Option::Some(t) = tenant_id {
for record in &mut records {
::autumn_web::tenancy::ModelTenantIdMeta::try_set_tenant_id(record, t);
}
}
}
} else {
quote! {
let tenant_id: ::core::option::Option<::std::string::String> = ::core::option::Option::None;
let tenant_id = tenant_id.as_ref();
let records = records;
}
};
let load_expr = if config.tenant_scoped {
quote! {
if let ::core::option::Option::Some(ref t) = tenant_id {
#table_ident::table
.filter(#table_ident::id.eq_any(&chunk_ids))
.filter(#table_ident::tenant_id.eq(t.clone()))
.for_update()
.load::<#model_name>(conn)
.await
} else {
#table_ident::table
.filter(#table_ident::id.eq_any(&chunk_ids))
.for_update()
.load::<#model_name>(conn)
.await
}
}
} else {
quote! {
#table_ident::table
.filter(#table_ident::id.eq_any(&chunk_ids))
.for_update()
.load::<#model_name>(conn)
.await
}
};
let upsert_expr = quote! {
let chunk_upserted = #model_name::__autumn_execute_upsert(
chunk,
tenant_id.map(|t| t.as_str()),
conn,
)
.await
.map_err(::autumn_web::AutumnError::from)?;
};
let size_check = if config.tenant_scoped {
quote! {
if has_lock && upserted.len() != records.len() {
return Err(::autumn_web::AutumnError::conflict_msg(
format!(
"Conflict: only {} of {} records were upserted (potential lock-version/optimistic lock or tenant conflict)",
upserted.len(),
records.len()
)
));
} else if !has_lock && upserted.len() != records.len() {
return Err(::autumn_web::AutumnError::bad_request_msg(
format!(
"Tenant conflict: only {} of {} records were upserted (potential cross-tenant conflict)",
upserted.len(),
records.len()
)
));
}
}
} else {
quote! {
if has_lock && upserted.len() != records.len() {
return Err(::autumn_web::AutumnError::conflict_msg(
format!(
"Conflict: only {} of {} records were upserted (potential lock-version/optimistic lock conflict)",
upserted.len(),
records.len()
)
));
}
}
};
quote! {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
use ::autumn_web::repository::AutumnColumnCountSpecific as _;
use ::autumn_web::repository::AutumnColumnCountFallback as _;
use ::autumn_web::repository::AutumnUpsertSetExt as _;
use ::autumn_web::repository::AutumnLockVersionModelExt as _;
use ::autumn_web::repository::AutumnUpsertExecutionExt as _;
if records.is_empty() {
return Ok(Vec::new());
}
let mut unique_ids = ::std::collections::HashSet::new();
for record in records.iter() {
if !unique_ids.insert(record.id) {
return Err(::autumn_web::AutumnError::bad_request_msg(
format!("Duplicate record ID detected in bulk upsert: {}", record.id)
));
}
}
let mut has_lock = false;
if let ::core::option::Option::Some(first_rec) = records.first() {
if first_rec.__autumn_lock_version_actual().is_some() {
has_lock = true;
}
}
#tenant_id_setup
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<_, ::autumn_web::AutumnError, _>(|conn| {
async move {
let mut upserted = Vec::new();
let cols = (&records[0]).__autumn_column_count() + #tenant_extra;
let chunk_size = if cols == 0 { 1000 } else { (65535usize / cols).min(1000).max(1) };
#vh_upsert_lock_keys
for chunk in records.chunks(chunk_size) {
let chunk_ids: Vec<_> = chunk.iter().map(|r| r.id).collect();
let existing_rows = #load_expr
.map_err(::autumn_web::AutumnError::from)?;
if has_lock {
for existing in &existing_rows {
if let ::core::option::Option::Some(db_lock) = existing.__autumn_lock_version_actual() {
if let ::core::option::Option::Some(incoming) = chunk.iter().find(|r| r.id == existing.id) {
if let ::core::option::Option::Some(incoming_lock) = incoming.__autumn_lock_version_actual() {
if incoming_lock != db_lock {
return Err(::autumn_web::AutumnError::conflict(
::autumn_web::RepositoryError::Conflict {
id: existing.id,
expected_version: incoming_lock,
actual_version: ::core::option::Option::Some(db_lock),
},
));
}
}
}
}
}
}
#vh_upsert_before_collect
#upsert_expr
#vh_upsert_write
upserted.extend(chunk_upserted);
}
#size_check
Ok(upserted)
}
.scope_boxed()
})
.await
}
};
(
struct_fields,
clone_impl,
extractor_init,
save_body,
update_body,
delete_body,
quote! {},
quote! {},
save_many_body,
save_many_skip_invalid_body,
update_many_body,
delete_many_body,
upsert_many_body,
)
};
let route_hook_registration = if commit_hooks_enabled {
quote! { #pg_name::__autumn_register_repository_commit_hooks(); }
} else {
quote! {}
};
let versioned_inventory_registration = if config.versioned {
quote! {
::autumn_web::reexports::inventory::submit! {
::autumn_web::__private::VersionedRepositoryDescriptor
}
}
} else {
quote! {}
};
let pagination_trait_method = quote! {
fn page(&self, req: &::autumn_web::pagination::PageRequest)
-> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<::autumn_web::pagination::Page<#model_name>>> + Send;
};
let pagination_impl_method = if config.tenant_scoped {
quote! {
async fn page(
&self,
req: &::autumn_web::pagination::PageRequest,
) -> ::autumn_web::AutumnResult<::autumn_web::pagination::Page<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let mut conn = self.__autumn_acquire_read_conn().await?;
let query = #table_ident::table;
if let ::core::option::Option::Some(ref t) = tenant_id {
let total: i64 = query
.filter(#table_ident::tenant_id.eq(t))
#sd_filter
.count()
.get_result::<i64>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
let items: ::std::vec::Vec<#model_name> = query
.filter(#table_ident::tenant_id.eq(t))
#sd_filter
.order(#table_ident::id.desc())
.limit(req.limit())
.offset(req.offset())
.select(#model_name::as_select())
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
::core::result::Result::Ok(::autumn_web::pagination::Page::new(items, total, req))
} else {
let total: i64 = query
#sd_filter
.count()
.get_result::<i64>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
let items: ::std::vec::Vec<#model_name> = query
#sd_filter
.order(#table_ident::id.desc())
.limit(req.limit())
.offset(req.offset())
.select(#model_name::as_select())
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
::core::result::Result::Ok(::autumn_web::pagination::Page::new(items, total, req))
}
}
}
} else {
quote! {
async fn page(
&self,
req: &::autumn_web::pagination::PageRequest,
) -> ::autumn_web::AutumnResult<::autumn_web::pagination::Page<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_read_conn().await?;
let total: i64 = #table_ident::table
#sd_filter
.count()
.get_result::<i64>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
let items: ::std::vec::Vec<#model_name> = #table_ident::table
#sd_filter
.order(#table_ident::id.desc())
.limit(req.limit())
.offset(req.offset())
.select(#model_name::as_select())
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
::core::result::Result::Ok(::autumn_web::pagination::Page::new(items, total, req))
}
}
};
let tenant_query_filter = if config.tenant_scoped {
quote! {
if !self.across_tenants {
let tenant_id = match ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten() {
::core::option::Option::Some(t) => t,
::core::option::Option::None => {
return ::core::result::Result::Err(::autumn_web::AutumnError::internal_server_error_msg(
"no tenant context was established"
));
}
};
query = query.filter(#table_ident::tenant_id.eq(tenant_id));
}
}
} else {
quote! {}
};
let (cursor_page_trait_method, cursor_page_impl_method) = if let Some(ref ck) =
config.cursor_key
{
let cursor_key_ident = format_ident!("{ck}");
let trait_method = quote! {
fn cursor_page(&self, req: &::autumn_web::pagination::CursorRequest)
-> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<::autumn_web::pagination::CursorPage<#model_name>>> + Send;
};
let impl_method = if let Some(ref key_type) = config.cursor_key_type {
quote! {
async fn cursor_page(
&self,
req: &::autumn_web::pagination::CursorRequest,
) -> ::autumn_web::AutumnResult<::autumn_web::pagination::CursorPage<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_read_conn().await?;
let mut query = #table_ident::table.into_boxed();
#tenant_query_filter
if let ::core::option::Option::Some((after_k, after_id)) =
req.decode::<(#key_type, i64)>()
{
query = query.filter(
#table_ident::#cursor_key_ident.lt(after_k.clone()).or(
#table_ident::#cursor_key_ident
.eq(after_k)
.and(#table_ident::id.lt(after_id)),
),
);
}
#[allow(unused_mut)]
let mut query = query #sd_filter;
let items: ::std::vec::Vec<#model_name> = query
.order((#table_ident::#cursor_key_ident.desc(), #table_ident::id.desc()))
.limit(req.fetch_limit())
.select(#model_name::as_select())
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
::core::result::Result::Ok(
::autumn_web::pagination::CursorPage::from_overfetched(
items,
req,
|row| (row.#cursor_key_ident.clone(), row.id),
)
)
}
}
} else {
quote! {
async fn cursor_page(
&self,
req: &::autumn_web::pagination::CursorRequest,
) -> ::autumn_web::AutumnResult<::autumn_web::pagination::CursorPage<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_read_conn().await?;
let mut query = #table_ident::table.into_boxed();
#tenant_query_filter
if let ::core::option::Option::Some(after_id) = req.decode::<i64>() {
query = query.filter(#table_ident::id.lt(after_id));
}
#[allow(unused_mut)]
let mut query = query #sd_filter;
let items: ::std::vec::Vec<#model_name> = query
.order((#table_ident::#cursor_key_ident.desc(), #table_ident::id.desc()))
.limit(req.fetch_limit())
.select(#model_name::as_select())
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
::core::result::Result::Ok(
::autumn_web::pagination::CursorPage::from_overfetched(
items,
req,
|row| row.id,
)
)
}
}
};
(trait_method, impl_method)
} else {
(quote! {}, quote! {})
};
let tenant_scoped_traits = if config.tenant_scoped {
quote! {
impl ::autumn_web::tenancy::HasTenantIdColumn for #table_ident::table {
type Column = #table_ident::tenant_id;
fn column() -> Self::Column {
#table_ident::tenant_id
}
}
impl<'a> ::autumn_web::tenancy::TenantInsertable<'a, #table_ident::table> for #new_name {
type Values = <::autumn_web::tenancy::TenantInsertableValuesSelector<'a, Self, #table_ident::table, { <Self as ::autumn_web::tenancy::ModelTenantIdMeta>::HAS_MANUAL_TENANT_ID }> as ::autumn_web::tenancy::GetInsertableValues>::Values;
fn tenant_values(self, tenant_id: &'a str) -> Self::Values {
::autumn_web::tenancy::GetInsertableValues::get_values(::autumn_web::tenancy::TenantInsertableValuesSelector::<'a, Self, #table_ident::table, { <Self as ::autumn_web::tenancy::ModelTenantIdMeta>::HAS_MANUAL_TENANT_ID }> {
inner: self,
tenant_id,
_marker: ::core::marker::PhantomData,
})
}
}
impl<'a> ::autumn_web::tenancy::TenantInsertable<'a, #table_ident::table> for #model_name {
type Values = <::autumn_web::tenancy::TenantInsertableValuesSelector<'a, Self, #table_ident::table, { <Self as ::autumn_web::tenancy::ModelTenantIdMeta>::HAS_MANUAL_TENANT_ID }> as ::autumn_web::tenancy::GetInsertableValues>::Values;
fn tenant_values(self, tenant_id: &'a str) -> Self::Values {
::autumn_web::tenancy::GetInsertableValues::get_values(::autumn_web::tenancy::TenantInsertableValuesSelector::<'a, Self, #table_ident::table, { <Self as ::autumn_web::tenancy::ModelTenantIdMeta>::HAS_MANUAL_TENANT_ID }> {
inner: self,
tenant_id,
_marker: ::core::marker::PhantomData,
})
}
}
}
} else {
quote! {}
};
let api_handlers = if let Some(ref api_path) = config.api_path {
let prefix = to_snake_case(&model_name.to_string());
let list_fn = format_ident!("{prefix}_api_list");
let get_fn = format_ident!("{prefix}_api_get");
let create_fn = format_ident!("{prefix}_api_create");
let update_fn = format_ident!("{prefix}_api_update");
let delete_fn = format_ident!("{prefix}_api_delete");
let list_info = format_ident!("__autumn_route_info_{prefix}_api_list");
let get_info = format_ident!("__autumn_route_info_{prefix}_api_get");
let create_info = format_ident!("__autumn_route_info_{prefix}_api_create");
let update_info = format_ident!("__autumn_route_info_{prefix}_api_update");
let delete_info = format_ident!("__autumn_route_info_{prefix}_api_delete");
let list_path_fn = format_ident!("__autumn_path_{prefix}_api_list");
let get_path_fn = format_ident!("__autumn_path_{prefix}_api_get");
let create_path_fn = format_ident!("__autumn_path_{prefix}_api_create");
let update_path_fn = format_ident!("__autumn_path_{prefix}_api_update");
let delete_path_fn = format_ident!("__autumn_path_{prefix}_api_delete");
let id_path = format!("{api_path}/{{id}}");
let has_policy = config.policy_type.is_some();
let policy_check_show = if has_policy {
quote! {
::autumn_web::authorization::__check_policy::<#model_name>(
&__autumn_state,
&__autumn_session,
"show",
&record,
)
.await?;
}
} else {
quote! {}
};
let policy_check_create_pre = if has_policy {
quote! {
::autumn_web::authorization::__check_policy_create_payload::<#model_name>(
&__autumn_state,
&__autumn_session,
&__autumn_new_payload,
)
.await?;
}
} else {
quote! {}
};
let create_payload_arg = if has_policy {
quote! {
::autumn_web::prelude::Json(__autumn_new_payload): ::autumn_web::prelude::Json<
::autumn_web::reexports::serde_json::Value
>
}
} else {
quote! {
::autumn_web::prelude::Json(new): ::autumn_web::prelude::Json<#new_name>
}
};
let decode_create_payload = if has_policy {
quote! {
let new: #new_name = ::autumn_web::reexports::serde_json::from_value(
__autumn_new_payload.clone(),
)
.map_err(|err| ::autumn_web::AutumnError::unprocessable_msg(err.to_string()))?;
}
} else {
quote! {}
};
let policy_check_update_pre = if has_policy {
quote! {
let __existing = repo.on_primary().find_by_id(id).await?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg("not found"))?;
::autumn_web::authorization::__check_policy::<#model_name>(
&__autumn_state,
&__autumn_session,
"update",
&__existing,
)
.await?;
}
} else {
quote! {}
};
let policy_check_delete_pre = if has_policy {
quote! {
let __existing = repo.on_primary().find_by_id(id).await?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg("not found"))?;
::autumn_web::authorization::__check_policy::<#model_name>(
&__autumn_state,
&__autumn_session,
"delete",
&__existing,
)
.await?;
}
} else {
quote! {}
};
let session_state_args = if has_policy {
quote! {
::autumn_web::reexports::axum::extract::State(__autumn_state):
::autumn_web::reexports::axum::extract::State<::autumn_web::AppState>,
__autumn_session: ::autumn_web::session::Session,
__autumn_idempotency_replay: ::core::option::Option<
::autumn_web::reexports::axum::extract::Extension<
::autumn_web::idempotency::IdempotencyReplayResponse
>
>,
}
} else {
quote! {}
};
let scope_list_body = if config.scope_type.is_some() {
quote! {
let __scope = __autumn_state
.scope::<#model_name>()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg(
"missing scope registration"
))?;
let __ctx = ::autumn_web::authorization::PolicyContext::from_request(
&__autumn_state,
&__autumn_session,
).await;
let mut __conn = repo.__autumn_acquire_read_conn().await?;
let records = __scope.list(&__ctx, &mut __conn).await?;
Ok(::autumn_web::prelude::Json(records))
}
} else if has_policy {
quote! {
let __policy = __autumn_state
.policy::<#model_name>()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg(
"missing policy registration"
))?;
let __ctx = ::autumn_web::authorization::PolicyContext::from_request(
&__autumn_state,
&__autumn_session,
).await;
let __all = repo.find_all().await?;
let mut __filtered = ::std::vec::Vec::with_capacity(__all.len());
for __record in __all {
if __policy.can_show(&__ctx, &__record).await {
__filtered.push(__record);
}
}
Ok(::autumn_web::prelude::Json(__filtered))
}
} else {
quote! {
Ok(::autumn_web::prelude::Json(repo.find_all().await?))
}
};
let list_session_state_args = if config.scope_type.is_some() || has_policy {
quote! {
::autumn_web::reexports::axum::extract::State(__autumn_state):
::autumn_web::reexports::axum::extract::State<::autumn_web::AppState>,
__autumn_session: ::autumn_web::session::Session,
}
} else {
quote! {}
};
let resource_type_name_lit = model_name.to_string();
let api_path_lit = api_path.clone();
let policy_type_assertion = if let Some(ref policy_type) = config.policy_type {
quote! {
const _: fn() = || {
fn __autumn_assert_policy<P: ::autumn_web::authorization::Policy<#model_name>>() {}
__autumn_assert_policy::<#policy_type>();
};
}
} else {
quote! {}
};
let policy_check_fn = if config.policy_type.is_some() {
quote! {
::core::option::Option::Some(
(|registry: &::autumn_web::authorization::PolicyRegistry| {
registry.has_policy::<#model_name>()
}) as fn(&::autumn_web::authorization::PolicyRegistry) -> bool
)
}
} else {
quote! { ::core::option::Option::None }
};
let list_scope_check_fn = if config.scope_type.is_some() {
quote! {
::core::option::Option::Some(
(|registry: &::autumn_web::authorization::PolicyRegistry| {
registry.scope::<#model_name>().is_some()
}) as fn(&::autumn_web::authorization::PolicyRegistry) -> bool
)
}
} else {
quote! { ::core::option::Option::None }
};
let non_list_scope_check_fn = quote! { ::core::option::Option::None };
let scope_type_assertion = if let Some(ref scope_type) = config.scope_type {
quote! {
const _: fn() = || {
fn __autumn_assert_scope<S: ::autumn_web::authorization::Scope<#model_name>>() {}
__autumn_assert_scope::<#scope_type>();
};
}
} else {
quote! {}
};
let create_return_type = if has_policy {
quote! {
::autumn_web::idempotency::IdempotencyReplayOr<
::autumn_web::AutumnResult<(
::autumn_web::reexports::http::StatusCode,
::autumn_web::prelude::Json<#model_name>
)>
>
}
} else {
quote! {
::autumn_web::AutumnResult<(
::autumn_web::reexports::http::StatusCode,
::autumn_web::prelude::Json<#model_name>
)>
}
};
let create_body = if has_policy {
quote! {
let new: #new_name = match ::autumn_web::reexports::serde_json::from_value(
__autumn_new_payload.clone(),
) {
::core::result::Result::Ok(new) => new,
::core::result::Result::Err(err) => {
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(
::autumn_web::AutumnError::unprocessable_msg(err.to_string())
)
);
}
};
if let ::core::result::Result::Err(err) =
::autumn_web::authorization::__check_policy_create_payload::<#model_name>(
&__autumn_state,
&__autumn_session,
&__autumn_new_payload,
)
.await
{
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(err)
);
}
if let ::core::option::Option::Some(response) =
::autumn_web::idempotency::__replay_response(&__autumn_idempotency_replay)
{
return ::autumn_web::idempotency::IdempotencyReplayOr::Replay(response);
}
let record = match repo.save(&new).await {
::core::result::Result::Ok(record) => record,
::core::result::Result::Err(err) => {
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(err)
);
}
};
::autumn_web::idempotency::IdempotencyReplayOr::Inner(::core::result::Result::Ok((
::autumn_web::reexports::http::StatusCode::CREATED,
::autumn_web::prelude::Json(record)
)))
}
} else {
quote! {
#decode_create_payload
#policy_check_create_pre
let record = repo.save(&new).await?;
Ok((::autumn_web::reexports::http::StatusCode::CREATED, ::autumn_web::prelude::Json(record)))
}
};
let update_return_type = if has_policy {
quote! {
::autumn_web::idempotency::IdempotencyReplayOr<
::autumn_web::AutumnResult<::autumn_web::prelude::Json<#model_name>>
>
}
} else {
quote! {
::autumn_web::AutumnResult<::autumn_web::prelude::Json<#model_name>>
}
};
let update_body = if has_policy {
quote! {
let __existing = match repo.on_primary().find_by_id(id).await {
::core::result::Result::Ok(::core::option::Option::Some(existing)) => existing,
::core::result::Result::Ok(::core::option::Option::None) => {
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(::autumn_web::AutumnError::not_found_msg("not found"))
);
}
::core::result::Result::Err(err) => {
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(err)
);
}
};
if let ::core::result::Result::Err(err) =
::autumn_web::authorization::__check_policy::<#model_name>(
&__autumn_state,
&__autumn_session,
"update",
&__existing,
)
.await
{
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(err)
);
}
if let ::core::option::Option::Some(response) =
::autumn_web::idempotency::__replay_response(&__autumn_idempotency_replay)
{
return ::autumn_web::idempotency::IdempotencyReplayOr::Replay(response);
}
let record = match repo.update(id, &patch).await {
::core::result::Result::Ok(record) => record,
::core::result::Result::Err(err) => {
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(err)
);
}
};
::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Ok(::autumn_web::prelude::Json(record))
)
}
} else {
quote! {
#policy_check_update_pre
let record = repo.update(id, &patch).await?;
Ok(::autumn_web::prelude::Json(record))
}
};
let delete_return_type = if has_policy {
quote! {
::autumn_web::idempotency::IdempotencyReplayOr<
::autumn_web::AutumnResult<::autumn_web::reexports::http::StatusCode>
>
}
} else {
quote! {
::autumn_web::AutumnResult<::autumn_web::reexports::http::StatusCode>
}
};
let delete_body = if has_policy {
quote! {
let __autumn_replay_response =
::autumn_web::idempotency::__replay_response(&__autumn_idempotency_replay);
let __autumn_replay_deleted_record =
::autumn_web::idempotency::__replay_metadata(
&__autumn_idempotency_replay,
"repository.delete.record",
);
let __existing = match repo.on_primary().find_by_id(id).await {
::core::result::Result::Ok(::core::option::Option::Some(existing)) => existing,
::core::result::Result::Ok(::core::option::Option::None) => {
if let ::core::option::Option::Some(bytes) = __autumn_replay_deleted_record {
match ::autumn_web::reexports::serde_json::from_slice::<#model_name>(&bytes) {
::core::result::Result::Ok(existing) => existing,
::core::result::Result::Err(err) => {
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(
::autumn_web::AutumnError::internal_server_error_msg(err.to_string())
)
);
}
}
} else {
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(::autumn_web::AutumnError::not_found_msg("not found"))
);
}
}
::core::result::Result::Err(err) => {
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(err)
);
}
};
if let ::core::result::Result::Err(err) =
::autumn_web::authorization::__check_policy::<#model_name>(
&__autumn_state,
&__autumn_session,
"delete",
&__existing,
)
.await
{
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(err)
);
}
if let ::core::option::Option::Some(response) = __autumn_replay_response {
return ::autumn_web::idempotency::IdempotencyReplayOr::Replay(response);
}
let __autumn_deleted_record_metadata =
match ::autumn_web::reexports::serde_json::to_vec(&__existing) {
::core::result::Result::Ok(bytes) => bytes,
::core::result::Result::Err(err) => {
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(
::autumn_web::AutumnError::internal_server_error_msg(err.to_string())
)
);
}
};
if let ::core::result::Result::Err(err) = repo.delete_by_id(id).await {
return ::autumn_web::idempotency::IdempotencyReplayOr::Inner(
::core::result::Result::Err(err)
);
}
::autumn_web::idempotency::IdempotencyReplayOr::InnerWithReplayMetadata(
::core::result::Result::Ok(::autumn_web::reexports::http::StatusCode::NO_CONTENT),
::std::vec![(
"repository.delete.record".to_owned(),
__autumn_deleted_record_metadata,
)],
)
}
} else {
quote! {
#policy_check_delete_pre
repo.delete_by_id(id).await?;
Ok(::autumn_web::reexports::http::StatusCode::NO_CONTENT)
}
};
let create_handler_expr = if has_policy {
quote! { ::autumn_web::reexports::axum::routing::post(#create_fn) }
} else {
quote! {
::autumn_web::reexports::axum::routing::MethodRouter::<
::autumn_web::AppState, ::core::convert::Infallible
>::layer(
::autumn_web::reexports::axum::routing::post(#create_fn),
::autumn_web::idempotency::IdempotencyReplayLayer,
)
}
};
let update_handler_expr = if has_policy {
quote! { ::autumn_web::reexports::axum::routing::put(#update_fn) }
} else {
quote! {
::autumn_web::reexports::axum::routing::MethodRouter::<
::autumn_web::AppState, ::core::convert::Infallible
>::layer(
::autumn_web::reexports::axum::routing::put(#update_fn),
::autumn_web::idempotency::IdempotencyReplayLayer,
)
}
};
let delete_handler_expr = if has_policy {
quote! { ::autumn_web::reexports::axum::routing::delete(#delete_fn) }
} else {
quote! {
::autumn_web::reexports::axum::routing::MethodRouter::<
::autumn_web::AppState, ::core::convert::Infallible
>::layer(
::autumn_web::reexports::axum::routing::delete(#delete_fn),
::autumn_web::idempotency::IdempotencyReplayLayer,
)
}
};
quote! {
#policy_type_assertion
#scope_type_assertion
#vis async fn #list_fn(
#list_session_state_args
repo: #pg_name,
) -> ::autumn_web::AutumnResult<::autumn_web::prelude::Json<Vec<#model_name>>> {
#scope_list_body
}
#[doc(hidden)]
#vis fn #list_info() -> ::autumn_web::Route {
#route_hook_registration
::autumn_web::Route {
method: ::autumn_web::reexports::http::Method::GET,
path: #api_path,
handler: ::autumn_web::reexports::axum::routing::MethodRouter::<
::autumn_web::AppState, ::core::convert::Infallible
>::layer(
::autumn_web::reexports::axum::routing::get(#list_fn),
::autumn_web::idempotency::IdempotencyReplayLayer,
),
name: ::core::stringify!(#list_fn),
api_doc: ::autumn_web::openapi::ApiDoc {
method: "GET",
path: #api_path,
operation_id: ::core::stringify!(#list_fn),
success_status: 200,
response: ::core::option::Option::Some(
::autumn_web::openapi::SchemaEntry {
name: "array",
kind: ::autumn_web::openapi::SchemaKind::Array(
&::autumn_web::openapi::SchemaEntry {
name: ::core::stringify!(#model_name),
kind: ::autumn_web::openapi::SchemaKind::Ref,
}
),
}
),
..::core::default::Default::default()
},
repository: ::core::option::Option::Some(::autumn_web::RepositoryApiMeta {
resource_type_name: #resource_type_name_lit,
api_path: #api_path_lit,
has_policy: #has_policy,
policy_check: #policy_check_fn,
scope_check: #list_scope_check_fn,
}),
idempotency: ::autumn_web::RouteIdempotency::ReplayThroughInner,
api_version: ::core::option::Option::None,
sunset_opt_out: false,
}
}
#vis async fn #get_fn(
#session_state_args
::autumn_web::extract::Path(id): ::autumn_web::extract::Path<i64>,
repo: #pg_name,
) -> ::autumn_web::AutumnResult<::autumn_web::prelude::Json<#model_name>> {
let record = repo.find_by_id(id).await?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg("not found"))?;
#policy_check_show
Ok(::autumn_web::prelude::Json(record))
}
#[doc(hidden)]
#vis fn #get_info() -> ::autumn_web::Route {
#route_hook_registration
::autumn_web::Route {
method: ::autumn_web::reexports::http::Method::GET,
path: #id_path,
handler: ::autumn_web::reexports::axum::routing::MethodRouter::<
::autumn_web::AppState, ::core::convert::Infallible
>::layer(
::autumn_web::reexports::axum::routing::get(#get_fn),
::autumn_web::idempotency::IdempotencyReplayLayer,
),
name: ::core::stringify!(#get_fn),
api_doc: ::autumn_web::openapi::ApiDoc {
method: "GET",
path: #id_path,
operation_id: ::core::stringify!(#get_fn),
path_params: &["id"],
success_status: 200,
response: ::core::option::Option::Some(
::autumn_web::openapi::SchemaEntry {
name: ::core::stringify!(#model_name),
kind: ::autumn_web::openapi::SchemaKind::Ref,
}
),
..::core::default::Default::default()
},
repository: ::core::option::Option::Some(::autumn_web::RepositoryApiMeta {
resource_type_name: #resource_type_name_lit,
api_path: #api_path_lit,
has_policy: #has_policy,
policy_check: #policy_check_fn,
scope_check: #non_list_scope_check_fn,
}),
idempotency: ::autumn_web::RouteIdempotency::ReplayThroughInner,
api_version: ::core::option::Option::None,
sunset_opt_out: false,
}
}
#vis async fn #create_fn(
#session_state_args
repo: #pg_name,
#create_payload_arg,
) -> #create_return_type {
#create_body
}
#[doc(hidden)]
#vis fn #create_info() -> ::autumn_web::Route {
#route_hook_registration
::autumn_web::Route {
method: ::autumn_web::reexports::http::Method::POST,
path: #api_path,
handler: #create_handler_expr,
name: ::core::stringify!(#create_fn),
api_doc: ::autumn_web::openapi::ApiDoc {
method: "POST",
path: #api_path,
operation_id: ::core::stringify!(#create_fn),
success_status: 201,
request_body: ::core::option::Option::Some(
::autumn_web::openapi::SchemaEntry {
name: ::core::stringify!(#new_name),
kind: ::autumn_web::openapi::SchemaKind::Ref,
}
),
response: ::core::option::Option::Some(
::autumn_web::openapi::SchemaEntry {
name: ::core::stringify!(#model_name),
kind: ::autumn_web::openapi::SchemaKind::Ref,
}
),
..::core::default::Default::default()
},
repository: ::core::option::Option::Some(::autumn_web::RepositoryApiMeta {
resource_type_name: #resource_type_name_lit,
api_path: #api_path_lit,
has_policy: #has_policy,
policy_check: #policy_check_fn,
scope_check: #non_list_scope_check_fn,
}),
idempotency: ::autumn_web::RouteIdempotency::ReplayThroughInner,
api_version: ::core::option::Option::None,
sunset_opt_out: false,
}
}
#vis async fn #update_fn(
#session_state_args
::autumn_web::extract::Path(id): ::autumn_web::extract::Path<i64>,
repo: #pg_name,
::autumn_web::prelude::Json(patch): ::autumn_web::prelude::Json<#update_name>,
) -> #update_return_type {
#update_body
}
#[doc(hidden)]
#vis fn #update_info() -> ::autumn_web::Route {
#route_hook_registration
::autumn_web::Route {
method: ::autumn_web::reexports::http::Method::PUT,
path: #id_path,
handler: #update_handler_expr,
name: ::core::stringify!(#update_fn),
api_doc: ::autumn_web::openapi::ApiDoc {
method: "PUT",
path: #id_path,
operation_id: ::core::stringify!(#update_fn),
path_params: &["id"],
success_status: 200,
request_body: ::core::option::Option::Some(
::autumn_web::openapi::SchemaEntry {
name: ::core::stringify!(#update_name),
kind: ::autumn_web::openapi::SchemaKind::Ref,
}
),
response: ::core::option::Option::Some(
::autumn_web::openapi::SchemaEntry {
name: ::core::stringify!(#model_name),
kind: ::autumn_web::openapi::SchemaKind::Ref,
}
),
..::core::default::Default::default()
},
repository: ::core::option::Option::Some(::autumn_web::RepositoryApiMeta {
resource_type_name: #resource_type_name_lit,
api_path: #api_path_lit,
has_policy: #has_policy,
policy_check: #policy_check_fn,
scope_check: #non_list_scope_check_fn,
}),
idempotency: ::autumn_web::RouteIdempotency::ReplayThroughInner,
api_version: ::core::option::Option::None,
sunset_opt_out: false,
}
}
#vis async fn #delete_fn(
#session_state_args
::autumn_web::extract::Path(id): ::autumn_web::extract::Path<i64>,
repo: #pg_name,
) -> #delete_return_type {
#delete_body
}
#[doc(hidden)]
#vis fn #delete_info() -> ::autumn_web::Route {
#route_hook_registration
::autumn_web::Route {
method: ::autumn_web::reexports::http::Method::DELETE,
path: #id_path,
handler: #delete_handler_expr,
name: ::core::stringify!(#delete_fn),
api_doc: ::autumn_web::openapi::ApiDoc {
method: "DELETE",
path: #id_path,
operation_id: ::core::stringify!(#delete_fn),
path_params: &["id"],
success_status: 204,
..::core::default::Default::default()
},
repository: ::core::option::Option::Some(::autumn_web::RepositoryApiMeta {
resource_type_name: #resource_type_name_lit,
api_path: #api_path_lit,
has_policy: #has_policy,
policy_check: #policy_check_fn,
scope_check: #non_list_scope_check_fn,
}),
idempotency: ::autumn_web::RouteIdempotency::ReplayThroughInner,
api_version: ::core::option::Option::None,
sunset_opt_out: false,
}
}
#[doc(hidden)]
#vis fn #list_path_fn() -> ::std::string::String {
#api_path.to_owned()
}
#[doc(hidden)]
#vis fn #get_path_fn(id: impl ::std::fmt::Display) -> ::std::string::String {
format!("{}/{}", #api_path, ::autumn_web::paths::encode_path_segment(id))
}
#[doc(hidden)]
#vis fn #create_path_fn() -> ::std::string::String {
#api_path.to_owned()
}
#[doc(hidden)]
#vis fn #update_path_fn(id: impl ::std::fmt::Display) -> ::std::string::String {
format!("{}/{}", #api_path, ::autumn_web::paths::encode_path_segment(id))
}
#[doc(hidden)]
#vis fn #delete_path_fn(id: impl ::std::fmt::Display) -> ::std::string::String {
format!("{}/{}", #api_path, ::autumn_web::paths::encode_path_segment(id))
}
}
} else {
quote! {}
};
let soft_delete_trait_methods = if config.soft_delete {
quote! {
fn restore(&self, id: i64) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<()>> + Send;
fn purge(&self, id: i64) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<()>> + Send;
fn with_deleted(&self) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<Vec<#model_name>>> + Send;
fn only_deleted(&self) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<Vec<#model_name>>> + Send;
fn page_only_deleted(&self, req: &::autumn_web::pagination::PageRequest) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<::autumn_web::pagination::Page<#model_name>>> + Send;
}
} else {
quote! {}
};
let soft_delete_impl_methods = if config.soft_delete {
if config.tenant_scoped {
let tenant_id_setup = quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
};
quote! {
async fn restore(&self, id: i64) -> ::autumn_web::AutumnResult<()> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
#tenant_id_setup
let mut conn = self.__autumn_acquire_conn().await?;
let query = #table_ident::table.find(id);
let __count = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::update(query.filter(#table_ident::tenant_id.eq(t)))
.set(#table_ident::deleted_at.eq(::core::option::Option::None::<::autumn_web::reexports::chrono::NaiveDateTime>))
.execute(&mut conn)
.await
} else {
::autumn_web::reexports::diesel::update(query)
.set(#table_ident::deleted_at.eq(::core::option::Option::None::<::autumn_web::reexports::chrono::NaiveDateTime>))
.execute(&mut conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
Ok(())
}
async fn purge(&self, id: i64) -> ::autumn_web::AutumnResult<()> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
#tenant_id_setup
let mut conn = self.__autumn_acquire_conn().await?;
let query = #table_ident::table.find(id);
let __count = if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::delete(query.filter(#table_ident::tenant_id.eq(t)))
.execute(&mut conn)
.await
} else {
::autumn_web::reexports::diesel::delete(query)
.execute(&mut conn)
.await
}
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
Ok(())
}
async fn with_deleted(&self) -> ::autumn_web::AutumnResult<Vec<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
#tenant_id_setup
let mut conn = self.__autumn_acquire_read_conn().await?;
let query = #table_ident::table;
if let ::core::option::Option::Some(ref t) = tenant_id {
query.filter(#table_ident::tenant_id.eq(t))
.load::<#model_name>(&mut conn)
.await
} else {
query
.load::<#model_name>(&mut conn)
.await
}
.map_err(::autumn_web::AutumnError::from)
}
async fn only_deleted(&self) -> ::autumn_web::AutumnResult<Vec<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
#tenant_id_setup
let mut conn = self.__autumn_acquire_read_conn().await?;
let query = #table_ident::table.filter(#table_ident::deleted_at.is_not_null());
if let ::core::option::Option::Some(ref t) = tenant_id {
query.filter(#table_ident::tenant_id.eq(t))
.load::<#model_name>(&mut conn)
.await
} else {
query
.load::<#model_name>(&mut conn)
.await
}
.map_err(::autumn_web::AutumnError::from)
}
async fn page_only_deleted(
&self,
req: &::autumn_web::pagination::PageRequest,
) -> ::autumn_web::AutumnResult<::autumn_web::pagination::Page<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
#tenant_id_setup
let mut conn = self.__autumn_acquire_read_conn().await?;
let query = #table_ident::table.filter(#table_ident::deleted_at.is_not_null());
if let ::core::option::Option::Some(ref t) = tenant_id {
let total: i64 = query
.filter(#table_ident::tenant_id.eq(t))
.count()
.get_result::<i64>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
let items: ::std::vec::Vec<#model_name> = query
.filter(#table_ident::tenant_id.eq(t))
.order(#table_ident::id.desc())
.limit(req.limit())
.offset(req.offset())
.select(#model_name::as_select())
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
::core::result::Result::Ok(::autumn_web::pagination::Page::new(items, total, req))
} else {
let total: i64 = query
.count()
.get_result::<i64>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
let items: ::std::vec::Vec<#model_name> = query
.order(#table_ident::id.desc())
.limit(req.limit())
.offset(req.offset())
.select(#model_name::as_select())
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
::core::result::Result::Ok(::autumn_web::pagination::Page::new(items, total, req))
}
}
}
} else {
quote! {
async fn restore(&self, id: i64) -> ::autumn_web::AutumnResult<()> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_conn().await?;
let query = #table_ident::table.find(id);
let __count = ::autumn_web::reexports::diesel::update(query)
.set(#table_ident::deleted_at.eq(::core::option::Option::None::<::autumn_web::reexports::chrono::NaiveDateTime>))
.execute(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
Ok(())
}
async fn purge(&self, id: i64) -> ::autumn_web::AutumnResult<()> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_conn().await?;
let query = #table_ident::table.find(id);
let __count = ::autumn_web::reexports::diesel::delete(query)
.execute(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
if __count == 0 {
return Err(::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
));
}
Ok(())
}
async fn with_deleted(&self) -> ::autumn_web::AutumnResult<Vec<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_read_conn().await?;
let query = #table_ident::table;
query
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
async fn only_deleted(&self) -> ::autumn_web::AutumnResult<Vec<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_read_conn().await?;
let query = #table_ident::table.filter(#table_ident::deleted_at.is_not_null());
query
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
async fn page_only_deleted(
&self,
req: &::autumn_web::pagination::PageRequest,
) -> ::autumn_web::AutumnResult<::autumn_web::pagination::Page<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_read_conn().await?;
let query = #table_ident::table.filter(#table_ident::deleted_at.is_not_null());
let total: i64 = query
.count()
.get_result::<i64>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
let items: ::std::vec::Vec<#model_name> = query
.order(#table_ident::id.desc())
.limit(req.limit())
.offset(req.offset())
.select(#model_name::as_select())
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
::core::result::Result::Ok(::autumn_web::pagination::Page::new(items, total, req))
}
}
}
} else {
quote! {}
};
let find_by_id_impl = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let query = #table_ident::table.find(id);
if let ::core::option::Option::Some(ref t) = tenant_id {
query.filter(#table_ident::tenant_id.eq(t))
#sd_filter
.first::<#model_name>(&mut conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)
} else {
query
#sd_filter
.first::<#model_name>(&mut conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)
}
}
} else {
quote! {
#table_ident::table
.find(id)
#sd_filter
.first::<#model_name>(&mut conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)
}
};
let find_all_impl = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let query = #table_ident::table;
if let ::core::option::Option::Some(ref t) = tenant_id {
query.filter(#table_ident::tenant_id.eq(t))
#sd_filter
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
} else {
query
#sd_filter
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
}
} else {
quote! {
#table_ident::table
#sd_filter
.load::<#model_name>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
};
let count_impl = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let query = #table_ident::table;
if let ::core::option::Option::Some(ref t) = tenant_id {
query.filter(#table_ident::tenant_id.eq(t))
#sd_filter
.count()
.get_result::<i64>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
} else {
query
#sd_filter
.count()
.get_result::<i64>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
}
} else {
quote! {
#table_ident::table
#sd_filter
.count()
.get_result::<i64>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
};
let exists_by_id_impl = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let query = #table_ident::table.find(id);
if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::select(
::autumn_web::reexports::diesel::dsl::exists(
query.filter(#table_ident::tenant_id.eq(t)) #sd_filter
)
)
.get_result::<bool>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
} else {
::autumn_web::reexports::diesel::select(
::autumn_web::reexports::diesel::dsl::exists(
query #sd_filter
)
)
.get_result::<bool>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
}
} else {
quote! {
::autumn_web::reexports::diesel::select(
::autumn_web::reexports::diesel::dsl::exists(
#table_ident::table.find(id) #sd_filter
)
)
.get_result::<bool>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)
}
};
let upsert_many_trait_method = if config.hooks_type.is_none() && !config.no_upsert_trait {
quote! {
fn upsert_many(&self, records: &[#model_name]) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<Vec<#model_name>>> + Send
where
#model_name: ::autumn_web::reexports::diesel::Insertable<#table_ident::table>;
}
} else {
quote! {}
};
let bulk_trait_methods = quote! {
fn save_many(&self, new: &[#new_name]) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<Vec<#model_name>>> + Send;
fn save_many_skip_invalid(&self, new: &[#new_name]) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<(Vec<#model_name>, Vec<(usize, ::autumn_web::AutumnError)>)>> + Send;
fn update_many(&self, ids: &[i64], changes: &#update_name) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<Vec<#model_name>>> + Send;
fn delete_many(&self, ids: &[i64]) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<()>> + Send;
#upsert_many_trait_method
};
let upsert_many_impl_method = if config.hooks_type.is_none() && !config.no_upsert_trait {
quote! {
async fn upsert_many(&self, records: &[#model_name]) -> ::autumn_web::AutumnResult<Vec<#model_name>>
where
#model_name: ::autumn_web::reexports::diesel::Insertable<#table_ident::table>
{
#upsert_many_body
}
}
} else {
quote! {}
};
let config_soft_delete = config.soft_delete;
let config_tenant_scoped = config.tenant_scoped;
let second_stage_soft_delete_filter = if config_soft_delete {
quote! {
records_query = records_query.filter(#table_ident::deleted_at.is_null());
}
} else {
quote! {}
};
let second_stage_tenant_filter = if config_tenant_scoped {
quote! {
if let ::core::option::Option::Some(ref t) = tenant_id {
records_query = records_query.filter(#table_ident::tenant_id.eq(t.clone()));
}
}
} else {
quote! {}
};
let search_trait_methods = if config.searchable {
quote! {
fn search(&self, query: &str) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<Vec<#model_name>>> + Send;
fn search_page(
&self,
query: &str,
req: &::autumn_web::pagination::PageRequest,
) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<::autumn_web::pagination::Page<#model_name>>> + Send;
}
} else {
quote! {}
};
let search_compile_check = if config.searchable {
quote! {
const _: () = {
fn assert_searchable<T: ::autumn_web::repository::AutumnSearchableModel>() {}
let _ = assert_searchable::<#model_name>;
if !<#model_name as ::autumn_web::repository::AutumnSearchableModel>::IS_SEARCHABLE {
::core::panic!("The backing model is not marked with #[searchable] or has no searchable fields configured, but its repository has `searchable = true` enabled.");
}
if <#model_name as ::autumn_web::repository::AutumnSearchableModel>::SEARCH_FIELDS.is_empty() {
::core::panic!("The backing model is marked with #[searchable] but has zero searchable fields configured, but its repository has `searchable = true` enabled.");
}
};
}
} else {
quote! {}
};
let search_impl_methods = if config.searchable {
let tenant_id_setup = if config.tenant_scoped {
quote! {
let tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
}
} else {
quote! {
let tenant_id = ::core::option::Option::None::<::std::string::String>;
}
};
quote! {
async fn search(&self, query: &str) -> ::autumn_web::AutumnResult<Vec<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
if query.trim().is_empty() {
return Ok(Vec::new());
}
#[derive(::autumn_web::reexports::diesel::QueryableByName)]
struct SearchId {
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::BigInt)]
id: i64,
}
#tenant_id_setup
let mut conn = self.__autumn_acquire_read_conn().await?;
let language = <#model_name as ::autumn_web::repository::AutumnSearchableModel>::SEARCH_LANGUAGE;
let mut sql = format!(
"SELECT id FROM \"{}\" WHERE search_vector @@ websearch_to_tsquery($1::regconfig, $2)",
#table_name
);
if #config_soft_delete {
sql.push_str(" AND deleted_at IS NULL");
}
if let ::core::option::Option::Some(ref _t) = tenant_id {
sql.push_str(" AND tenant_id = $3");
}
sql.push_str(" ORDER BY ts_rank_cd(search_vector, websearch_to_tsquery($1::regconfig, $2)) DESC, id DESC");
let ids = if #config_tenant_scoped {
if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::sql_query(sql)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(language)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(query)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(t)
.load::<SearchId>(&mut conn)
.await?
} else {
::autumn_web::reexports::diesel::sql_query(sql)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(language)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(query)
.load::<SearchId>(&mut conn)
.await?
}
} else {
::autumn_web::reexports::diesel::sql_query(sql)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(language)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(query)
.load::<SearchId>(&mut conn)
.await?
};
let id_list: Vec<i64> = ids.into_iter().map(|s| s.id).collect();
if id_list.is_empty() {
return Ok(Vec::new());
}
let mut records_query = #table_ident::table
.filter(#table_ident::id.eq_any(&id_list))
.into_boxed();
#second_stage_soft_delete_filter
#second_stage_tenant_filter
let records = records_query
.load::<#model_name>(&mut conn)
.await?;
let mut record_map: ::std::collections::HashMap<i64, #model_name> = records
.into_iter()
.map(|r| (r.id, r))
.collect();
let sorted_records: Vec<#model_name> = id_list
.iter()
.filter_map(|id| record_map.remove(id))
.collect();
Ok(sorted_records)
}
async fn search_page(
&self,
query: &str,
req: &::autumn_web::pagination::PageRequest,
) -> ::autumn_web::AutumnResult<::autumn_web::pagination::Page<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
if query.trim().is_empty() {
return Ok(::autumn_web::pagination::Page::new(Vec::new(), 0, req));
}
#[derive(::autumn_web::reexports::diesel::QueryableByName)]
struct SearchId {
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::BigInt)]
id: i64,
}
#[derive(::autumn_web::reexports::diesel::QueryableByName)]
struct SearchCount {
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::BigInt)]
count: i64,
}
#tenant_id_setup
let mut conn = self.__autumn_acquire_read_conn().await?;
let language = <#model_name as ::autumn_web::repository::AutumnSearchableModel>::SEARCH_LANGUAGE;
let limit = req.limit();
let offset = req.offset();
let mut count_sql = format!(
"SELECT COUNT(*) AS count FROM \"{}\" WHERE search_vector @@ websearch_to_tsquery($1::regconfig, $2)",
#table_name
);
if #config_soft_delete {
count_sql.push_str(" AND deleted_at IS NULL");
}
if let ::core::option::Option::Some(ref _t) = tenant_id {
count_sql.push_str(" AND tenant_id = $3");
}
let total = if #config_tenant_scoped {
if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::sql_query(count_sql)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(language)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(query)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(t)
.get_result::<SearchCount>(&mut conn)
.await?
.count
} else {
::autumn_web::reexports::diesel::sql_query(count_sql)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(language)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(query)
.get_result::<SearchCount>(&mut conn)
.await?
.count
}
} else {
::autumn_web::reexports::diesel::sql_query(count_sql)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(language)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(query)
.get_result::<SearchCount>(&mut conn)
.await?
.count
};
let mut select_sql = format!(
"SELECT id FROM \"{}\" WHERE search_vector @@ websearch_to_tsquery($1::regconfig, $2)",
#table_name
);
if #config_soft_delete {
select_sql.push_str(" AND deleted_at IS NULL");
}
if let ::core::option::Option::Some(ref _t) = tenant_id {
select_sql.push_str(" AND tenant_id = $3");
select_sql.push_str(" ORDER BY ts_rank_cd(search_vector, websearch_to_tsquery($1::regconfig, $2)) DESC, id DESC");
select_sql.push_str(" LIMIT $4 OFFSET $5");
} else {
select_sql.push_str(" ORDER BY ts_rank_cd(search_vector, websearch_to_tsquery($1::regconfig, $2)) DESC, id DESC");
select_sql.push_str(" LIMIT $3 OFFSET $4");
}
let ids = if #config_tenant_scoped {
if let ::core::option::Option::Some(ref t) = tenant_id {
::autumn_web::reexports::diesel::sql_query(select_sql)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(language)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(query)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(t)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(limit)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(offset)
.load::<SearchId>(&mut conn)
.await?
} else {
::autumn_web::reexports::diesel::sql_query(select_sql)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(language)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(query)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(limit)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(offset)
.load::<SearchId>(&mut conn)
.await?
}
} else {
::autumn_web::reexports::diesel::sql_query(select_sql)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(language)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(query)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(limit)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(offset)
.load::<SearchId>(&mut conn)
.await?
};
let id_list: Vec<i64> = ids.into_iter().map(|s| s.id).collect();
if id_list.is_empty() {
return Ok(::autumn_web::pagination::Page::new(Vec::new(), total, req));
}
let mut records_query = #table_ident::table
.filter(#table_ident::id.eq_any(&id_list))
.into_boxed();
#second_stage_soft_delete_filter
#second_stage_tenant_filter
let records = records_query
.load::<#model_name>(&mut conn)
.await?;
let mut record_map: ::std::collections::HashMap<i64, #model_name> = records
.into_iter()
.map(|r| (r.id, r))
.collect();
let sorted_records: Vec<#model_name> = id_list
.iter()
.filter_map(|id| record_map.remove(id))
.collect();
Ok(::autumn_web::pagination::Page::new(sorted_records, total, req))
}
}
} else {
quote! {}
};
let upsert_set_ext_impl = quote! {};
let upsert_execution_ext_impl = quote! {};
let correlate_ext_impl = quote! {};
let versioned_record_impl = if config.versioned && !config.no_versioned_record_impl {
let sensitive_cols = match parse_version_history_sensitive(&trait_def.attrs) {
Ok(cols) => cols,
Err(err) => return err.to_compile_error(),
};
let sensitive_ts = if sensitive_cols.is_empty() {
quote! { &[] }
} else {
let col_lits: Vec<_> = sensitive_cols.iter().map(|c| quote! { #c }).collect();
quote! { &[#(#col_lits),*] }
};
let tenant_id_method = if config.tenant_scoped {
quote! {
fn version_tenant_id(&self) -> ::core::option::Option<&str> {
::autumn_web::version_history::VersionTenantIdValue::version_tenant_id(&self.tenant_id)
}
}
} else {
quote! {}
};
quote! {
impl ::autumn_web::version_history::VersionedRecord for #model_name {
fn version_table_name() -> &'static str {
#table_name
}
fn version_record_id(&self) -> i64 {
self.id
}
fn version_column_values(&self) -> ::autumn_web::reexports::serde_json::Value {
let mut __vh_value = ::autumn_web::reexports::serde_json::to_value(self)
.unwrap_or(::autumn_web::reexports::serde_json::Value::Object(Default::default()));
::autumn_web::encryption::encrypt_versioned_columns_in_value(
#table_name,
&mut __vh_value,
);
__vh_value
}
fn version_sensitive_columns() -> &'static [&'static str] {
static __VH_SENSITIVE: ::std::sync::OnceLock<::std::vec::Vec<&'static str>> =
::std::sync::OnceLock::new();
__VH_SENSITIVE
.get_or_init(|| {
let declared: &[&'static str] = #sensitive_ts;
let mut cols: ::std::vec::Vec<&'static str> = declared.to_vec();
::autumn_web::encryption::merge_encrypted_columns_for_table(
#table_name,
&mut cols,
);
cols
})
.as_slice()
}
#tenant_id_method
}
}
} else {
quote! {}
};
let version_history_tenant_setup = if config.tenant_scoped {
quote! {
let __version_history_tenant_id = if self.across_tenants {
::core::option::Option::None
} else {
let t = ::autumn_web::tenancy::CURRENT_TENANT.try_with(|t| t.clone()).ok().flatten()
.ok_or_else(|| ::autumn_web::AutumnError::internal_server_error_msg("Query scoped to tenant, but no tenant context was established"))?;
::core::option::Option::Some(t)
};
let __version_history_tenant_id = __version_history_tenant_id.as_deref();
}
} else {
quote! {
let __version_history_tenant_id: ::core::option::Option<&str> = ::core::option::Option::None;
}
};
let versioned_history_impl = if config.versioned {
quote! {
impl #pg_name {
pub async fn version_history(
&self,
record_id: i64,
filter: ::autumn_web::version_history::VersionFilter,
) -> ::autumn_web::AutumnResult<::autumn_web::version_history::VersionPage> {
use ::autumn_web::reexports::diesel_async::RunQueryDsl as _;
#[derive(::autumn_web::reexports::diesel::QueryableByName)]
struct __AutumnVersionHistoryRow {
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::BigInt)]
id: i64,
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::Text)]
table_name: ::std::string::String,
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::BigInt)]
record_id: i64,
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::Text)]
op: ::std::string::String,
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::Text)]
actor: ::std::string::String,
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Text>)]
request_id: ::core::option::Option<::std::string::String>,
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::Text)]
changes: ::std::string::String,
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::Timestamptz)]
recorded_at: ::autumn_web::reexports::chrono::DateTime<::autumn_web::reexports::chrono::Utc>,
}
#[derive(::autumn_web::reexports::diesel::QueryableByName)]
struct __AutumnVersionHistoryCount {
#[diesel(sql_type = ::autumn_web::reexports::diesel::sql_types::BigInt)]
count: i64,
}
let __table_name: &str = #table_name;
let (limit, offset) = filter.limit_offset();
let page = filter.page();
let per_page = filter.per_page();
#version_history_tenant_setup
let mut conn = self.__autumn_acquire_read_conn().await?;
let total: u64 = if filter.from.is_some() || filter.to.is_some() {
let rows = ::autumn_web::reexports::diesel::sql_query(
"SELECT COUNT(*)::bigint AS count \
FROM _autumn_version_history \
WHERE table_name = $1 AND record_id = $2 \
AND ($3::text IS NULL OR tenant_id = $3) \
AND ($4::timestamptz IS NULL OR recorded_at >= $4) \
AND ($5::timestamptz IS NULL OR recorded_at <= $5)"
)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(__table_name)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(record_id)
.bind::<::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Text>, _>(__version_history_tenant_id)
.bind::<::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Timestamptz>, _>(filter.from)
.bind::<::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Timestamptz>, _>(filter.to)
.get_results::<__AutumnVersionHistoryCount>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
rows.into_iter().next().map(|r| r.count).unwrap_or(0).max(0) as u64
} else {
let rows = ::autumn_web::reexports::diesel::sql_query(
"SELECT COUNT(*)::bigint AS count \
FROM _autumn_version_history \
WHERE table_name = $1 AND record_id = $2 \
AND ($3::text IS NULL OR tenant_id = $3)"
)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(__table_name)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(record_id)
.bind::<::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Text>, _>(__version_history_tenant_id)
.get_results::<__AutumnVersionHistoryCount>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?;
rows.into_iter().next().map(|r| r.count).unwrap_or(0).max(0) as u64
};
let raw_rows: Vec<__AutumnVersionHistoryRow> = if filter.from.is_some() || filter.to.is_some() {
::autumn_web::reexports::diesel::sql_query(
"SELECT id, table_name, record_id, op, actor, request_id, \
changes::text AS changes, recorded_at \
FROM _autumn_version_history \
WHERE table_name = $1 AND record_id = $2 \
AND ($3::text IS NULL OR tenant_id = $3) \
AND ($4::timestamptz IS NULL OR recorded_at >= $4) \
AND ($5::timestamptz IS NULL OR recorded_at <= $5) \
ORDER BY recorded_at ASC, id ASC \
LIMIT $6 OFFSET $7"
)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(__table_name)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(record_id)
.bind::<::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Text>, _>(__version_history_tenant_id)
.bind::<::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Timestamptz>, _>(filter.from)
.bind::<::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Timestamptz>, _>(filter.to)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(limit)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(offset)
.get_results::<__AutumnVersionHistoryRow>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?
} else {
::autumn_web::reexports::diesel::sql_query(
"SELECT id, table_name, record_id, op, actor, request_id, \
changes::text AS changes, recorded_at \
FROM _autumn_version_history \
WHERE table_name = $1 AND record_id = $2 \
AND ($3::text IS NULL OR tenant_id = $3) \
ORDER BY recorded_at ASC, id ASC \
LIMIT $4 OFFSET $5"
)
.bind::<::autumn_web::reexports::diesel::sql_types::Text, _>(__table_name)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(record_id)
.bind::<::autumn_web::reexports::diesel::sql_types::Nullable<::autumn_web::reexports::diesel::sql_types::Text>, _>(__version_history_tenant_id)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(limit)
.bind::<::autumn_web::reexports::diesel::sql_types::BigInt, _>(offset)
.get_results::<__AutumnVersionHistoryRow>(&mut conn)
.await
.map_err(::autumn_web::AutumnError::from)?
};
let entries: Vec<::autumn_web::version_history::VersionEntry> = raw_rows
.into_iter()
.map(|row| {
let op = match row.op.as_str() {
"insert" => ::autumn_web::version_history::VersionOp::Insert,
"delete" => ::autumn_web::version_history::VersionOp::Delete,
_ => ::autumn_web::version_history::VersionOp::Update,
};
let changes: Vec<::autumn_web::version_history::ColumnChange> =
::autumn_web::reexports::serde_json::from_str(&row.changes)
.unwrap_or_default();
::autumn_web::version_history::VersionEntry {
id: row.id,
table_name: row.table_name,
record_id: row.record_id,
op,
actor: row.actor,
request_id: row.request_id,
changes,
recorded_at: row.recorded_at,
}
})
.collect();
Ok(::autumn_web::version_history::VersionPage {
entries,
total,
page,
per_page,
})
}
}
}
} else {
quote! {}
};
quote! {
#vis trait #trait_name: Send + Sync {
fn find_by_id(&self, id: i64) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<Option<#model_name>>> + Send;
fn find_all(&self) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<Vec<#model_name>>> + Send;
fn save(&self, new: &#new_name) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<#model_name>> + Send;
fn update(&self, id: i64, changes: &#update_name) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<#model_name>> + Send;
fn delete_by_id(&self, id: i64) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<()>> + Send;
fn count(&self) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<i64>> + Send;
fn exists_by_id(&self, id: i64) -> impl ::std::future::Future<Output = ::autumn_web::AutumnResult<bool>> + Send;
#pagination_trait_method
#cursor_page_trait_method
#(#derived_trait_methods)*
#soft_delete_trait_methods
#bulk_trait_methods
#search_trait_methods
}
#vis struct #pg_name {
#struct_fields
}
impl ::autumn_web::reexports::axum::extract::FromRequestParts<::autumn_web::AppState> for #pg_name {
type Rejection = ::autumn_web::AutumnError;
async fn from_request_parts(
_parts: &mut ::autumn_web::reexports::http::request::Parts,
state: &::autumn_web::AppState,
) -> Result<Self, Self::Rejection> {
let pool = state.pool()
.ok_or_else(|| ::autumn_web::AutumnError::service_unavailable_msg("No database pool configured"))?
.clone();
#extractor_init
}
}
#clone_impl
impl #trait_name for #pg_name {
async fn find_by_id(&self, id: i64) -> ::autumn_web::AutumnResult<Option<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_read_conn().await?;
#find_by_id_impl
}
async fn find_all(&self) -> ::autumn_web::AutumnResult<Vec<#model_name>> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_read_conn().await?;
#find_all_impl
}
async fn save(&self, new: &#new_name) -> ::autumn_web::AutumnResult<#model_name> {
#save_body
}
async fn update(&self, id: i64, changes: &#update_name) -> ::autumn_web::AutumnResult<#model_name> {
#update_body
}
async fn delete_by_id(&self, id: i64) -> ::autumn_web::AutumnResult<()> {
#delete_body
}
async fn count(&self) -> ::autumn_web::AutumnResult<i64> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_read_conn().await?;
#count_impl
}
async fn exists_by_id(&self, id: i64) -> ::autumn_web::AutumnResult<bool> {
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
let mut conn = self.__autumn_acquire_read_conn().await?;
#exists_by_id_impl
}
#pagination_impl_method
#cursor_page_impl_method
#(#derived_impl_methods)*
#soft_delete_impl_methods
async fn save_many(&self, new: &[#new_name]) -> ::autumn_web::AutumnResult<Vec<#model_name>> {
#save_many_body
}
async fn save_many_skip_invalid(&self, new: &[#new_name]) -> ::autumn_web::AutumnResult<(Vec<#model_name>, Vec<(usize, ::autumn_web::AutumnError)>)> {
#save_many_skip_invalid_body
}
async fn update_many(&self, ids: &[i64], changes: &#update_name) -> ::autumn_web::AutumnResult<Vec<#model_name>> {
#update_many_body
}
async fn delete_many(&self, ids: &[i64]) -> ::autumn_web::AutumnResult<()> {
#delete_many_body
}
#upsert_many_impl_method
#search_impl_methods
}
impl #pg_name {
#across_tenants_method
#hook_support_methods
#[must_use]
pub fn on_primary(&self) -> Self {
let mut repo = ::core::clone::Clone::clone(self);
repo.__autumn_read_route = ::autumn_web::repository::ReadRoute::Primary;
repo
}
#[doc(hidden)]
pub fn __autumn_read_route(&self) -> &::autumn_web::repository::ReadRoute {
&self.__autumn_read_route
}
#[doc(hidden)]
pub fn __autumn_write_pool(
&self,
) -> &::autumn_web::reexports::diesel_async::pooled_connection::deadpool::Pool<
::autumn_web::reexports::diesel_async::AsyncPgConnection,
> {
&self.pool
}
#[doc(hidden)]
pub async fn __autumn_acquire_conn(
&self,
) -> ::autumn_web::AutumnResult<
::autumn_web::reexports::diesel_async::pooled_connection::deadpool::Object<
::autumn_web::reexports::diesel_async::AsyncPgConnection,
>,
> {
self.__autumn_acquire_from(&self.pool).await
}
#[doc(hidden)]
pub async fn __autumn_acquire_read_conn(
&self,
) -> ::autumn_web::AutumnResult<
::autumn_web::reexports::diesel_async::pooled_connection::deadpool::Object<
::autumn_web::reexports::diesel_async::AsyncPgConnection,
>,
> {
match &self.__autumn_read_route {
::autumn_web::repository::ReadRoute::Primary => {
self.__autumn_acquire_from(&self.pool).await
}
::autumn_web::repository::ReadRoute::ReadPool(pool) => {
self.__autumn_acquire_from(pool).await
}
::autumn_web::repository::ReadRoute::Unavailable => {
::core::result::Result::Err(
::autumn_web::AutumnError::service_unavailable_msg(
"read replica is configured but not ready, and the \
replica_fallback policy forbids primary reads",
),
)
}
}
}
async fn __autumn_acquire_from(
&self,
pool: &::autumn_web::reexports::diesel_async::pooled_connection::deadpool::Pool<
::autumn_web::reexports::diesel_async::AsyncPgConnection,
>,
) -> ::autumn_web::AutumnResult<
::autumn_web::reexports::diesel_async::pooled_connection::deadpool::Object<
::autumn_web::reexports::diesel_async::AsyncPgConnection,
>,
> {
use ::autumn_web::reexports::diesel_async::RunQueryDsl as _;
let mut conn = pool.get().await.map_err(|e| {
::autumn_web::reexports::tracing::error!(
"repository: failed to acquire database connection: {e}"
);
::autumn_web::AutumnError::service_unavailable_msg(
::std::format!("Database connection error: {e}")
)
})?;
let timeout_ms = self.__autumn_statement_timeout_ms;
let timeout_ms = timeout_ms.min(i32::MAX as u64);
::autumn_web::reexports::diesel::sql_query(
::std::format!("SET statement_timeout = {timeout_ms}")
)
.execute(&mut conn)
.await
.map_err(|e| {
::autumn_web::reexports::tracing::error!(
"repository: failed to set statement_timeout to {timeout_ms}ms: {e}"
);
::autumn_web::AutumnError::service_unavailable_msg(
::std::format!("Database initialization error: {e}")
)
})?;
::core::result::Result::Ok(conn)
}
#[doc(hidden)]
#[inline]
fn __autumn_route_label(&self) -> &str {
self.__autumn_route.as_deref().unwrap_or("unknown")
}
pub async fn with_lock<F, T>(&self, id: i64, f: F) -> ::autumn_web::AutumnResult<T>
where
F: for<'c> ::core::ops::FnOnce(
#model_name,
&'c mut ::autumn_web::reexports::diesel_async::AsyncPgConnection,
) -> ::autumn_web::reexports::scoped_futures::ScopedBoxFuture<'c, 'c, ::autumn_web::AutumnResult<T>>
+ ::core::marker::Send + 'static,
T: ::core::marker::Send + 'static,
{
use ::autumn_web::reexports::diesel::prelude::*;
use ::autumn_web::reexports::diesel_async::RunQueryDsl;
use ::autumn_web::reexports::diesel_async::AsyncConnection;
use ::autumn_web::reexports::scoped_futures::ScopedFutureExt as _;
let mut conn = self.__autumn_acquire_conn().await?;
conn.transaction::<T, ::autumn_web::AutumnError, _>(|conn| {
async move {
let row = #table_ident::table
.find(id)
.for_update()
.first::<#model_name>(conn)
.await
.optional()
.map_err(::autumn_web::AutumnError::from)?
.ok_or_else(|| ::autumn_web::AutumnError::not_found_msg(
format!("{} with id {} not found", stringify!(#model_name), id)
))?;
f(row, conn).await
}
.scope_boxed()
})
.await
}
}
#hook_inventory_registration
#versioned_inventory_registration
#api_handlers
#tenant_scoped_traits
#upsert_set_ext_impl
#upsert_execution_ext_impl
#correlate_ext_impl
#versioned_record_impl
#versioned_history_impl
#search_compile_check
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_find_by_single_field() {
let q = parse_query_name("find_by_title").unwrap();
assert_eq!(q.prefix, "find");
assert_eq!(q.fields, vec!["title"]);
}
#[test]
fn parse_find_by_two_fields() {
let q = parse_query_name("find_by_title_and_published").unwrap();
assert_eq!(q.prefix, "find");
assert_eq!(q.fields, vec!["title", "published"]);
assert_eq!(q.combinator, "and");
}
#[test]
fn parse_count_by() {
let q = parse_query_name("count_by_published").unwrap();
assert_eq!(q.prefix, "count");
assert_eq!(q.fields, vec!["published"]);
}
#[test]
fn parse_delete_by() {
let q = parse_query_name("delete_by_published").unwrap();
assert_eq!(q.prefix, "delete");
}
#[test]
fn parse_exists_by() {
let q = parse_query_name("exists_by_title").unwrap();
assert_eq!(q.prefix, "exists");
}
#[test]
fn parse_unknown_returns_none() {
assert!(parse_query_name("save").is_none());
assert!(parse_query_name("custom_method").is_none());
}
#[test]
fn mixed_and_or_returns_none() {
assert!(parse_query_name("find_by_a_and_b_or_c").is_none());
}
#[test]
fn parse_repo_args_with_hooks() {
let tokens: proc_macro2::TokenStream = "Post, hooks = PostHooks".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert_eq!(
config
.hooks_type
.as_ref()
.map(std::string::ToString::to_string),
Some("PostHooks".to_string())
);
assert!(
!config.commit_hooks,
"ordinary hooks must not opt into the durable commit-hook queue"
);
}
#[test]
fn parse_repo_args_with_commit_hooks() {
let tokens: proc_macro2::TokenStream = "Post, hooks = PostHooks, commit_hooks = true"
.parse()
.unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert!(config.hooks_type.is_some());
assert!(config.commit_hooks);
}
#[test]
fn parse_repo_args_rejects_commit_hooks_without_hooks() {
let tokens: proc_macro2::TokenStream = "Post, commit_hooks = true".parse().unwrap();
let Err(err) = parse_repo_args(tokens) else {
panic!("commit hooks require a hook type");
};
assert!(
err.to_string().contains("requires hooks"),
"unexpected error: {err}"
);
}
#[test]
fn repository_macro_api_mutation_prechecks_pin_reads_to_primary() {
let generated = repository_macro(
quote! { Post, api = "/api/posts", policy = PostPolicy },
quote! { pub trait PostRepository {} },
)
.to_string();
assert_eq!(
generated
.matches("repo . on_primary () . find_by_id")
.count(),
2,
"update/delete prechecks must pin find_by_id to the primary"
);
let get_fn = generated
.find("async fn post_api_get")
.expect("get handler must be generated");
let get_end = generated[get_fn..]
.find("async fn post_api_create")
.map_or(generated.len(), |offset| get_fn + offset);
let section = &generated[get_fn..get_end];
assert!(
section.contains("repo . find_by_id"),
"get handler must read through the repository: {section}"
);
assert!(
!section.contains("on_primary"),
"get handler reads must stay replica-eligible: {section}"
);
}
#[test]
fn parse_repo_args_with_primary_reads() {
let tokens: proc_macro2::TokenStream = "Post, primary_reads".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert!(
config.primary_reads,
"primary_reads must pin generated reads to the primary pool"
);
}
#[test]
fn parse_repo_args_primary_reads_defaults_off() {
let tokens: proc_macro2::TokenStream = "Post".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert!(
!config.primary_reads,
"reads route to the replica by default"
);
}
#[test]
fn parse_repo_args_without_hooks() {
let tokens: proc_macro2::TokenStream = "Post".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert!(config.hooks_type.is_none());
}
#[test]
fn parse_repo_args_with_table_and_hooks() {
let tokens: proc_macro2::TokenStream =
r#"Post, table = "blog_posts", hooks = PostHooks"#.parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert_eq!(config.table_name, "blog_posts");
assert_eq!(
config
.hooks_type
.as_ref()
.map(std::string::ToString::to_string),
Some("PostHooks".to_string())
);
}
#[test]
fn parse_repo_args_with_api() {
let tokens: proc_macro2::TokenStream = r#"Post, api = "/api/posts""#.parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert_eq!(config.api_path.as_deref(), Some("/api/posts"));
}
#[test]
fn policy_repository_api_replays_after_generated_policy_checks() {
let generated = repository_macro(
quote! { Post, api = "/api/posts", policy = PostPolicy },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("__replay_response"),
"policy-backed repository routes must consume cached replays after policy checks: {generated}"
);
assert!(
generated.contains("IdempotencyReplayOr"),
"policy-backed repository mutations need a response wrapper for post-policy replay: {generated}"
);
assert!(
generated.contains("__replay_metadata")
&& generated.contains("repository.delete.record")
&& generated.contains("InnerWithReplayMetadata"),
"policy-backed delete retries must carry the deleted record so policy checks can run before replay: {generated}"
);
}
#[test]
fn parse_repo_args_with_hooks_and_api() {
let tokens: proc_macro2::TokenStream =
r#"Post, hooks = PostHooks, api = "/api/v1/posts""#.parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert!(config.hooks_type.is_some());
assert_eq!(config.api_path.as_deref(), Some("/api/v1/posts"));
}
#[test]
fn parse_repo_args_without_api() {
let tokens: proc_macro2::TokenStream = "Post".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert!(config.api_path.is_none());
}
#[test]
fn hooked_repository_without_commit_hooks_uses_ordinary_hooks_only() {
let output = repository_macro(
quote! { Post, hooks = PostHooks },
quote! { pub trait PostRepository {} },
);
let generated = output.to_string();
assert!(
generated.contains("self . hooks . before_create")
&& generated.contains("self . hooks . after_create")
&& generated.contains("self . hooks . before_update")
&& generated.contains("self . hooks . after_update"),
"ordinary hooks should still be generated: {generated}"
);
assert!(
!generated.contains("enqueue_repository_commit_hook")
&& !generated.contains("RepositoryCommitHookDescriptor")
&& !generated.contains("__autumn_register_repository_commit_hooks"),
"ordinary hooks must not require or dispatch through the durable commit-hook queue: {generated}"
);
}
fn durable_hook_repository_tokens() -> String {
repository_macro(
quote! { Post, hooks = PostHooks, commit_hooks = true },
quote! { pub trait PostRepository {} },
)
.to_string()
}
#[test]
fn hooked_repository_commit_hooks_register_durable_runner_when_opted_in() {
let generated = durable_hook_repository_tokens();
assert!(
generated.contains("IdempotencyContext"),
"generated commit-hook repositories must extract the request idempotency context: {generated}"
);
assert!(
generated.contains("ctx . set_idempotency_key"),
"generated commit-hook repositories must seed MutationContext with the scoped idempotency key: {generated}"
);
assert!(
generated.contains("ctx . idempotency_key . as_deref ()"),
"generated commit-hook rows must use the scoped idempotency key for durable dedupe: {generated}"
);
assert!(
generated.contains("next_mutation_discriminator"),
"generated commit-hook rows must include a per-mutation idempotency discriminator: {generated}"
);
assert!(
generated.contains("enqueue_repository_commit_hook_pending_on_conn"),
"generated repositories must durably stage after_*_commit hooks before the mutation commits: {generated}"
);
assert!(
generated.contains("finalize_repository_commit_hook_after_hook"),
"generated repositories must only make staged after_*_commit hooks dispatchable after regular after hooks succeed: {generated}"
);
assert!(
generated.contains("mark_repository_commit_hook_after_hook_failed"),
"generated repositories must mark staged after_*_commit hooks non-dispatchable when regular after hooks fail: {generated}"
);
assert!(
generated.contains("RepositoryCommitHookDescriptor"),
"generated repositories must register hook runners at link time: {generated}"
);
assert!(
!generated.contains("register_after_commit"),
"generated after_*_commit hooks must not use the process-local callback registry"
);
assert!(
generated.contains("module_path ! ()"),
"durable hook handler keys must include the repository module path to avoid cross-module runner collisions: {generated}"
);
assert!(
generated.contains("env ! (\"CARGO_PKG_NAME\")"),
"durable hook handler keys should include a stable package/table/model identity: {generated}"
);
assert!(
generated.contains("__autumn_commit_hook_to_value")
&& generated.contains("__autumn_commit_hook_from_value"),
"generated repository hook runners must use the framework's full-fidelity record codec: {generated}"
);
assert!(
!generated.contains("serde_json :: from_value (__record)"),
"generated repository hook runners must not rehydrate records through public serde JSON: {generated}"
);
assert!(
!generated.contains("self . hooks . after_create (& mut ctx , & record) . await ?"),
"after_create errors must be reported without rolling back the inserted record: {generated}"
);
assert!(
!generated.contains("self . hooks . after_update (& mut ctx , & record) . await ?"),
"after_update errors must be reported without rolling back the updated record: {generated}"
);
}
#[test]
fn hooked_repository_commit_hook_post_commit_failures_are_idempotency_cacheable() {
let generated = durable_hook_repository_tokens();
assert!(
generated.contains("__cache_committed_error_response (__autumn_error)"),
"post-commit hook failures must be marked cacheable so idempotent retries do not duplicate the committed mutation: {generated}"
);
assert!(
generated
.contains("failed to finalize repository create commit hook after mutation commit; failing request closed")
&& generated.contains(
"failed to finalize repository update commit hook after mutation commit; failing request closed"
),
"post-commit finalization failures should be logged as fail-closed outcomes: {generated}"
);
}
#[test]
fn hooked_repository_create_commit_hooks_finalize_after_regular_after_hook() {
let generated = durable_hook_repository_tokens();
let create_stage = generated
.find(
"\"create\" , ctx . idempotency_key . as_deref () , __autumn_commit_hook_discriminator . as_deref () , & ctx , & __autumn_commit_hook_record",
)
.expect("create commit hook staging should use the encoded record");
let create_after = generated
.find("self . hooks . after_create (& mut ctx , & record)")
.expect("after_create hook should still be generated");
let create_drop_conn = generated
.find(":: core :: mem :: drop (conn)")
.expect("create path should release the repository connection before after/finalize");
let create_finalize = generated
.find("finalize_repository_commit_hook_after_hook (& self . pool , & __autumn_commit_hook_id")
.expect("create commit hook should be finalized after after_create succeeds");
assert!(
create_stage < create_after,
"create commit hook rows must be staged inside the mutation transaction: {generated}"
);
assert!(
create_stage < create_drop_conn && create_drop_conn < create_after,
"create path must release its checked-out connection before after_create/finalize checks out from the pool: {generated}"
);
assert!(
create_after < create_finalize,
"after_create_commit dispatch must see the finalized MutationContext from after_create: {generated}"
);
let create_failure_mark = generated
.find("mark_repository_commit_hook_after_hook_failed")
.expect("after_create failure path should mark the staged row as non-dispatchable");
let create_cancel = generated
.find("__autumn_pending_heartbeat . cancel ()")
.expect("after_create path should cancel the pending heartbeat");
assert!(
generated.contains("catch_repository_after_hook_unwind")
&& generated.contains(":: std :: panic :: resume_unwind")
&& generated.contains("after_create panicked")
&& generated.contains("__cache_committed_error_response"),
"idempotent after_create panics must cache a committed error while non-idempotent calls still unwind: {generated}"
);
assert!(
create_failure_mark < create_cancel,
"after_create failure must mark the staged row before canceling its heartbeat: {generated}"
);
}
#[test]
fn hooked_repository_update_commit_hooks_finalize_after_regular_after_hook() {
let generated = durable_hook_repository_tokens();
let update_stage = generated
.find(
"\"update\" , ctx . idempotency_key . as_deref () , __autumn_commit_hook_discriminator . as_deref () , & ctx , & __autumn_commit_hook_record",
)
.expect("update commit hook staging should use the encoded record");
let update_after = generated
.find("self . hooks . after_update (& mut ctx , & record)")
.expect("after_update hook should still be generated");
let update_drop_conn = generated[update_stage..update_after]
.find(":: core :: mem :: drop (conn)")
.map(|idx| update_stage + idx)
.expect("update path should release the repository connection before after/finalize");
let update_finalize = generated
.rfind("finalize_repository_commit_hook_after_hook (& self . pool , & __autumn_commit_hook_id")
.expect("update commit hook should be finalized after after_update succeeds");
assert!(
update_stage < update_after,
"update commit hook rows must be staged inside the mutation transaction: {generated}"
);
assert!(
update_stage < update_drop_conn && update_drop_conn < update_after,
"update path must release its checked-out connection before after_update/finalize checks out from the pool: {generated}"
);
assert!(
update_after < update_finalize,
"after_update_commit dispatch must see the finalized MutationContext from after_update: {generated}"
);
let update_failure_mark = generated[update_after..]
.find("mark_repository_commit_hook_after_hook_failed")
.map(|idx| update_after + idx)
.expect("after_update failure path should mark the staged row as non-dispatchable");
let update_cancel = generated[update_after..]
.find("__autumn_pending_heartbeat . cancel ()")
.map(|idx| update_after + idx)
.expect("after_update path should cancel the pending heartbeat");
assert!(
update_failure_mark < update_cancel,
"after_update failure must mark the staged row before canceling its heartbeat: {generated}"
);
assert!(
generated.contains("after_update panicked")
&& generated.contains("__cache_committed_error_response")
&& generated.contains(":: std :: panic :: resume_unwind"),
"idempotent after_update panics must cache a committed error while non-idempotent calls still unwind: {generated}"
);
}
#[test]
fn hooked_repository_delete_commit_hooks_lock_and_check_deleted_count() {
let generated = durable_hook_repository_tokens();
let delete_start = generated
.find("MutationOp :: Delete")
.expect("delete path should still be generated");
let delete_generated = &generated[delete_start..];
let delete_lock = delete_generated
.find(". for_update ()")
.expect("delete path should lock the row before before_delete");
let before_delete = delete_generated
.find("before_delete")
.expect("before_delete hook should still be generated");
assert!(
delete_lock < before_delete,
"delete path must lock the row before invoking before_delete: {generated}"
);
assert!(
generated.contains("let __autumn_deleted =")
&& generated.contains("if __autumn_deleted == 0"),
"delete path must not enqueue after_delete_commit when no row was deleted: {generated}"
);
}
#[test]
fn hooked_repository_versioned_save_many_writes_history_before_extending_results() {
let generated = repository_macro(
quote! { Post, hooks = PostHooks, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
let save_many_pos = generated
.find("let inputs_ref = & inputs")
.expect("hooked save_many should keep a reference to prepared inputs");
let section = &generated[save_many_pos..];
let insert_pos = section
.find("let chunk_inserted =")
.expect("hooked save_many should insert chunks");
let history_pos = section
.find("INSERT INTO _autumn_version_history")
.expect("hooked versioned save_many must write create history");
let extend_pos = section
.find("inserted . extend (chunk_inserted)")
.expect("hooked save_many should extend inserted results");
assert!(
insert_pos < history_pos && history_pos < extend_pos,
"hooked versioned save_many must record history for each inserted record before moving chunk results: {section}"
);
assert!(
section.contains("ctx . actor . as_deref"),
"hooked versioned save_many history should use the per-record MutationContext: {section}"
);
}
#[test]
fn hooked_commit_repository_versioned_save_many_writes_history_before_enqueuing_hooks() {
let generated = repository_macro(
quote! { Post, hooks = PostHooks, commit_hooks = true, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
let save_many_pos = generated
.find("let (inserted_records , hook_infos , global_indices)")
.expect("commit-hook save_many should collect inserted records and hook metadata");
let section = &generated[save_many_pos..];
let insert_pos = section
.find("let chunk_inserted =")
.expect("commit-hook save_many should insert chunks");
let history_pos = section
.find("INSERT INTO _autumn_version_history")
.expect("commit-hook versioned save_many must write create history");
let enqueue_pos = section
.find("enqueue_repository_commit_hooks_pending_bulk_on_conn")
.expect("commit-hook save_many should enqueue create hooks");
assert!(
insert_pos < history_pos && history_pos < enqueue_pos,
"commit-hook versioned save_many must record history inside the mutation transaction before hook enqueue: {section}"
);
}
#[test]
fn hooked_repository_versioned_delete_many_writes_history() {
let generated = repository_macro(
quote! { Post, hooks = PostHooks, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
let delete_many_pos = generated
.find("async fn delete_many")
.expect("hooked repository should implement delete_many");
let section = &generated[delete_many_pos..];
let history_pos = section
.find("INSERT INTO _autumn_version_history")
.expect("hooked versioned delete_many must write delete history");
let delete_pos = section
.find("diesel :: delete")
.or_else(|| section.find("diesel :: update"))
.expect("hooked versioned delete_many must delete/update records");
assert!(
delete_pos < history_pos,
"hooked versioned delete_many must record history after database deletion/update: {section}"
);
}
#[test]
fn snake_case_simple() {
assert_eq!(to_snake_case("Bookmark"), "bookmark");
}
#[test]
fn snake_case_multi_word() {
assert_eq!(to_snake_case("PageRevision"), "page_revision");
}
#[test]
fn snake_case_already_lower() {
assert_eq!(to_snake_case("widget"), "widget");
}
#[test]
fn parse_repo_args_with_cursor_key() {
let tokens: proc_macro2::TokenStream = "Post, cursor_key = created_at".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert_eq!(
config.cursor_key.as_deref(),
Some("created_at"),
"cursor_key attribute must be stored on RepoConfig"
);
assert!(
config.cursor_key_type.is_none(),
"cursor_key_type must be None when not specified"
);
}
#[test]
fn parse_repo_args_with_cursor_key_and_type() {
let tokens: proc_macro2::TokenStream =
"Post, cursor_key = created_at, cursor_key_type = chrono::NaiveDateTime"
.parse()
.unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.cursor_key.as_deref(), Some("created_at"));
assert!(
config.cursor_key_type.is_some(),
"cursor_key_type must be parsed when specified"
);
assert_eq!(
config.cursor_key_type.as_ref().map(|p| p
.segments
.iter()
.map(|s| s.ident.to_string())
.collect::<Vec<_>>()
.join("::")),
Some("chrono::NaiveDateTime".to_string()),
);
}
#[test]
fn parse_repo_args_cursor_key_combined_with_api() {
let tokens: proc_macro2::TokenStream =
r#"Post, api = "/api/posts", cursor_key = created_at"#
.parse()
.unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.api_path.as_deref(), Some("/api/posts"));
assert_eq!(config.cursor_key.as_deref(), Some("created_at"));
}
#[test]
fn repository_macro_generates_page_method_in_trait_and_impl() {
let generated =
repository_macro(quote! { Post }, quote! { pub trait PostRepository {} }).to_string();
assert!(
generated.contains("fn page"),
"repository macro must generate a page() method in the trait: {generated}"
);
assert!(
generated.contains("PageRequest"),
"page() method must accept a PageRequest parameter: {generated}"
);
assert!(
generated.contains("Page <"),
"page() method must return Page<Model> in the trait: {generated}"
);
assert!(
generated.contains("order"),
"page() method must include ORDER BY for deterministic results: {generated}"
);
}
#[test]
fn repository_macro_generates_cursor_page_when_cursor_key_set() {
let generated = repository_macro(
quote! { Post, cursor_key = created_at },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("fn cursor_page"),
"cursor_key attribute must cause cursor_page() to be generated: {generated}"
);
assert!(
generated.contains("CursorRequest"),
"cursor_page() must accept a CursorRequest parameter: {generated}"
);
assert!(
generated.contains("CursorPage <"),
"cursor_page() must return CursorPage<Model>: {generated}"
);
assert!(
generated.contains("decode :: < i64 >") || generated.contains("decode::<i64>"),
"cursor_page() without cursor_key_type must use id-only cursor: {generated}"
);
}
#[test]
fn repository_macro_generates_two_part_filter_when_cursor_key_type_set() {
let generated = repository_macro(
quote! { Post, cursor_key = created_at, cursor_key_type = chrono::NaiveDateTime },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("fn cursor_page"),
"cursor_key + cursor_key_type must generate cursor_page(): {generated}"
);
assert!(
generated.contains("lt") && generated.contains("eq") && generated.contains("and"),
"cursor_key_type must cause a two-part keyset filter: {generated}"
);
assert!(
generated.contains("NaiveDateTime"),
"cursor_key_type must appear in the decode call: {generated}"
);
}
#[test]
fn repository_macro_does_not_generate_cursor_page_without_cursor_key() {
let generated =
repository_macro(quote! { Post }, quote! { pub trait PostRepository {} }).to_string();
assert!(
!generated.contains("cursor_page"),
"cursor_page() must only be generated when cursor_key is declared: {generated}"
);
}
#[test]
fn parse_repo_args_recognizes_soft_delete_flag() {
let tokens: proc_macro2::TokenStream = "Post, soft_delete".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert!(
config.soft_delete,
"soft_delete flag must be stored on RepoConfig"
);
}
#[test]
fn soft_delete_config_is_false_by_default() {
let tokens: proc_macro2::TokenStream = "Post".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert!(
!config.soft_delete,
"soft_delete must default to false when not specified"
);
}
#[test]
fn soft_delete_combined_with_api() {
let tokens: proc_macro2::TokenStream =
r#"Post, soft_delete, api = "/api/posts""#.parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert!(config.soft_delete);
assert_eq!(config.api_path.as_deref(), Some("/api/posts"));
}
#[test]
fn soft_delete_combined_with_hooks() {
let tokens: proc_macro2::TokenStream =
"Post, soft_delete, hooks = PostHooks".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert!(config.soft_delete);
assert!(config.hooks_type.is_some());
}
#[test]
fn repository_macro_soft_delete_generates_restore_and_purge_methods() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("fn restore"),
"soft_delete must generate a restore() method in the trait: {generated}"
);
assert!(
generated.contains("fn purge"),
"soft_delete must generate a purge() method in the trait: {generated}"
);
}
#[test]
fn repository_macro_soft_delete_generates_with_deleted_and_only_deleted() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("fn with_deleted"),
"soft_delete must generate a with_deleted() method: {generated}"
);
assert!(
generated.contains("fn only_deleted"),
"soft_delete must generate an only_deleted() method: {generated}"
);
}
#[test]
fn repository_macro_soft_delete_delete_uses_update_not_hard_delete() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("deleted_at"),
"soft_delete delete_by_id must reference deleted_at column: {generated}"
);
let delete_count = generated.matches("diesel :: delete").count();
let purge_count = generated.matches("fn purge").count();
assert!(
delete_count <= purge_count + 1,
"soft_delete delete_by_id must not issue a hard DELETE; only purge() should: {generated}"
);
}
#[test]
fn repository_macro_soft_delete_find_all_filters_deleted_at_is_null() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("deleted_at") && generated.contains("is_null"),
"soft_delete find_all must filter rows where deleted_at IS NULL: {generated}"
);
}
#[test]
fn repository_macro_without_soft_delete_does_not_generate_restore_or_purge() {
let generated =
repository_macro(quote! { Post }, quote! { pub trait PostRepository {} }).to_string();
assert!(
!generated.contains("fn restore"),
"non-soft-delete repository must not generate restore(): {generated}"
);
assert!(
!generated.contains("fn purge"),
"non-soft-delete repository must not generate purge(): {generated}"
);
}
#[test]
fn repository_macro_soft_delete_purge_issues_hard_delete() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("diesel :: delete"),
"purge() must issue a hard DELETE FROM: {generated}"
);
}
#[test]
fn repository_macro_soft_delete_only_deleted_filters_is_not_null() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("is_not_null"),
"only_deleted() must filter rows where deleted_at IS NOT NULL: {generated}"
);
}
#[test]
fn repository_macro_soft_delete_find_by_id_excludes_deleted_rows() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
let find_by_id = generated
.find("async fn find_by_id")
.expect("find_by_id must be generated");
let section = &generated[find_by_id..find_by_id + 400];
assert!(
section.contains("is_null"),
"find_by_id must filter deleted_at IS NULL: {section}"
);
}
#[test]
fn repository_macro_soft_delete_find_all_impl_filters_deleted_rows() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
let find_all = generated
.find("async fn find_all")
.expect("find_all must be generated");
let section = &generated[find_all..find_all + 400];
assert!(
section.contains("is_null"),
"find_all impl must filter deleted_at IS NULL: {section}"
);
}
#[test]
fn repository_macro_soft_delete_count_filters_deleted_rows() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
let count_pos = generated
.find("async fn count")
.expect("count must be generated");
let section = &generated[count_pos..count_pos + 400];
assert!(
section.contains("is_null"),
"count impl must filter deleted_at IS NULL: {section}"
);
}
#[test]
fn repository_macro_soft_delete_exists_by_id_filters_deleted_rows() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
let exists_pos = generated
.find("async fn exists_by_id")
.expect("exists_by_id must be generated");
let section = &generated[exists_pos..exists_pos + 800];
assert!(
section.contains("is_null"),
"exists_by_id impl must filter deleted_at IS NULL: {section}"
);
}
#[test]
fn repository_macro_soft_delete_delete_by_id_targets_only_non_deleted() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
let delete_pos = generated
.find("async fn delete_by_id")
.expect("delete_by_id must be generated");
let section = &generated[delete_pos..delete_pos + 600];
assert!(
section.contains("is_null"),
"delete_by_id soft-delete UPDATE must add deleted_at IS NULL guard: {section}"
);
}
#[test]
fn repository_macro_soft_delete_hooked_prefetch_applies_sd_filter() {
let generated = repository_macro(
quote! { Post, soft_delete, hooks = PostHooks },
quote! { pub trait PostRepository {} },
)
.to_string();
let prefetch_pos = generated
.find("for_update")
.expect("hooked delete must generate a for_update prefetch");
let preamble = &generated[..prefetch_pos];
let last_is_null = preamble.rfind("is_null");
assert!(
last_is_null.is_some(),
"hooked prefetch must apply deleted_at IS NULL before for_update: {preamble}"
);
}
#[test]
fn repository_macro_soft_delete_derived_delete_uses_update_not_hard_delete() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {
fn delete_by_title(title: String);
} },
)
.to_string();
let impl_delete = generated
.find("async fn delete_by_title")
.expect("delete_by_title impl must be generated");
let section = &generated[impl_delete..impl_delete + 800];
assert!(
section.contains("deleted_at"),
"derived delete_by_title must reference deleted_at in soft-delete mode: {section}"
);
}
#[test]
fn repository_macro_soft_delete_cursor_page_applies_sd_filter() {
let generated = repository_macro(
quote! { Post, soft_delete, cursor_key = created_at },
quote! { pub trait PostRepository {} },
)
.to_string();
let cursor_pos = generated
.find("async fn cursor_page")
.expect("cursor_page impl must be generated");
let section = &generated[cursor_pos..cursor_pos + 800];
assert!(
section.contains("is_null"),
"cursor_page impl must apply deleted_at IS NULL filter in soft-delete mode: {section}"
);
}
#[test]
fn repository_macro_soft_delete_generates_page_only_deleted_method() {
let generated = repository_macro(
quote! { Post, soft_delete },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("page_only_deleted"),
"soft_delete must generate a page_only_deleted() method: {generated}"
);
}
#[test]
fn parse_repo_args_recognizes_tenant_scoped_flag() {
let tokens: proc_macro2::TokenStream = "Post, tenant_scoped".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert!(
config.tenant_scoped,
"tenant_scoped flag must be stored on RepoConfig"
);
}
#[test]
fn tenant_scoped_config_is_false_by_default() {
let tokens: proc_macro2::TokenStream = "Post".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert!(
!config.tenant_scoped,
"tenant_scoped must default to false when not specified"
);
}
#[test]
fn repository_macro_tenant_scoped_generates_across_tenants() {
let generated = repository_macro(
quote! { Post, tenant_scoped },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("across_tenants"),
"tenant_scoped must generate an across_tenants() method on the struct: {generated}"
);
}
#[test]
fn parse_repo_args_recognizes_versioned_flag() {
let tokens: proc_macro2::TokenStream = "Post, versioned = true".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert_eq!(config.model_name.to_string(), "Post");
assert!(
config.versioned,
"versioned = true flag must be stored on RepoConfig"
);
}
#[test]
fn versioned_config_is_false_by_default() {
let tokens: proc_macro2::TokenStream = "Post".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert!(
!config.versioned,
"versioned must default to false when not specified"
);
}
#[test]
fn parse_repo_args_versioned_false_explicitly() {
let tokens: proc_macro2::TokenStream = "Post, versioned = false".parse().unwrap();
let config = parse_repo_args(tokens).unwrap();
assert!(!config.versioned, "versioned = false must remain false");
}
#[test]
fn repository_macro_versioned_generates_history_method() {
let generated = repository_macro(
quote! { Post, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("version_history") || generated.contains("versioned"),
"versioned = true must generate version-history-related code: {generated}"
);
}
#[test]
fn repository_macro_versioned_registers_framework_migration_descriptor() {
let generated = repository_macro(
quote! { Post, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("VersionedRepositoryDescriptor"),
"versioned repositories must register a link-time descriptor so app startup installs the version-history migration: {generated}"
);
}
#[test]
fn repository_macro_versioned_generates_versioned_record_impl() {
let generated = repository_macro(
quote! { Post, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("VersionedRecord"),
"versioned = true must generate impl VersionedRecord for Post: {generated}"
);
assert!(
generated.contains("version_table_name"),
"generated VersionedRecord impl must include version_table_name: {generated}"
);
assert!(
generated.contains("version_sensitive_columns"),
"generated VersionedRecord impl must include version_sensitive_columns: {generated}"
);
}
#[test]
fn repository_macro_tenant_scoped_versioned_stores_tenant_id_in_history() {
let generated = repository_macro(
quote! { Post, tenant_scoped, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("tenant_id, record_id")
&& generated.contains("__vh_tenant_id")
&& generated.contains("version_tenant_id"),
"tenant-scoped history writes must persist tenant_id for fail-closed history reads: {generated}"
);
}
#[test]
fn repository_macro_tenant_scoped_version_history_filters_current_tenant() {
let generated = repository_macro(
quote! { Post, tenant_scoped, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("CURRENT_TENANT")
&& generated.contains("tenant_id = $3")
&& generated.contains("self . across_tenants"),
"tenant-scoped version_history must default to CURRENT_TENANT and only bypass through across_tenants(): {generated}"
);
}
#[test]
fn repository_macro_versioned_update_locks_before_history_diff_without_expected_version() {
let generated = repository_macro(
quote! { Post, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
let no_expected_branch = generated
.find("} else { load_query")
.expect("versioned update should have a no-expected-version load branch");
let section = &generated[no_expected_branch..];
let lock_pos = section
.find(". for_update ()")
.expect("versioned update must lock the row before computing history diff");
let first_pos = section
.find(". first :: < Post >")
.expect("versioned update should load the row before applying the update");
let history_pos = section
.find("INSERT INTO _autumn_version_history")
.expect("versioned update should write history");
assert!(
lock_pos < first_pos && first_pos < history_pos,
"versioned update must SELECT FOR UPDATE before loading the before image and writing history: {section}"
);
}
#[test]
fn repository_macro_versioned_upsert_many_locks_keys_before_history_snapshot() {
let generated = repository_macro(
quote! { Post, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
let lock_pos = generated
.find("pg_advisory_xact_lock")
.expect("versioned upsert_many must lock logical upsert keys before pre-reading rows");
let upsert_section = &generated[lock_pos..];
let load_pos = upsert_section
.find("let existing_rows =")
.expect("versioned upsert_many should load existing rows before writing history");
let before_map_pos = upsert_section
.find("let __vh_before_map")
.expect("versioned upsert_many should snapshot before images for history");
let history_pos = upsert_section
.find("INSERT INTO _autumn_version_history")
.expect("versioned upsert_many should write history entries");
assert!(
load_pos < before_map_pos && before_map_pos < history_pos,
"versioned upsert_many must serialize keys before classifying insert/update history: {generated}"
);
assert!(
generated.contains("repository_upsert_advisory_lock_key"),
"versioned upsert_many must derive stable advisory lock keys through the runtime helper: {generated}"
);
}
#[test]
fn repository_macro_versioned_upsert_many_locks_all_keys_before_chunk_loop() {
let generated = repository_macro(
quote! { Post, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
let lock_pos = generated
.find("pg_advisory_xact_lock")
.expect("versioned upsert_many must acquire advisory locks");
let chunk_loop_pos = generated
.find("records . chunks (chunk_size)")
.expect("versioned upsert_many should still chunk database writes");
assert!(
lock_pos < chunk_loop_pos,
"versioned upsert_many must acquire all advisory locks in global sorted order before chunking: {generated}"
);
assert!(
generated.contains("records . iter () . map (| r | r . id)"),
"versioned upsert_many lock set must be collected from all input records, not the current chunk: {generated}"
);
}
#[test]
fn repository_macro_tenant_scoped_upsert_many_rejects_partial() {
let generated = repository_macro(
quote! { Post, tenant_scoped },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("! has_lock && upserted . len () != records . len ()"),
"tenant-scoped upsert_many must reject partial upserts when lock versioning is absent: {generated}"
);
}
#[test]
fn repository_macro_tenant_scoped_versioned_tenant_id_handles_optional_string() {
let generated = repository_macro(
quote! { Post, tenant_scoped, versioned = true },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
!generated.contains("self . tenant_id . as_str"),
"tenant-scoped VersionedRecord must not assume tenant_id is a bare String: {generated}"
);
assert!(
generated.contains("VersionTenantIdValue :: version_tenant_id (& self . tenant_id)"),
"tenant-scoped VersionedRecord must delegate tenant_id extraction to the runtime helper: {generated}"
);
}
#[test]
fn repository_macro_versioned_sensitive_columns_parsed_from_attr() {
let generated = repository_macro(
quote! { Post, versioned = true },
quote! {
#[version_history(sensitive = ["password_digest", "reset_token"])]
pub trait PostRepository {}
},
)
.to_string();
assert!(
generated.contains("password_digest"),
"sensitive columns must appear in the generated VersionedRecord impl: {generated}"
);
assert!(
generated.contains("reset_token"),
"all sensitive columns must appear in the generated VersionedRecord impl: {generated}"
);
}
#[test]
fn repository_macro_versioned_sensitive_columns_merged_from_multiple_attrs() {
let generated = repository_macro(
quote! { Post, versioned = true },
quote! {
#[version_history(sensitive = ["password_digest"])]
#[version_history(sensitive = ["api_key"])]
pub trait PostRepository {}
},
)
.to_string();
assert!(
generated.contains("password_digest"),
"first attr sensitive columns must be present: {generated}"
);
assert!(
generated.contains("api_key"),
"second attr sensitive columns must be present: {generated}"
);
}
#[test]
fn repository_macro_versioned_non_string_in_sensitive_is_compile_error() {
let generated = repository_macro(
quote! { Post, versioned = true },
quote! {
#[version_history(sensitive = [password_digest])]
pub trait PostRepository {}
},
)
.to_string();
assert!(
generated.contains("compile_error"),
"non-string element in sensitive list must produce a compile error: {generated}"
);
assert!(
!generated.contains("version_table_name"),
"a compile-error output must not also contain a working impl: {generated}"
);
}
#[test]
fn repository_macro_versioned_typo_in_sensitive_attr_is_compile_error() {
let generated = repository_macro(
quote! { Post, versioned = true },
quote! {
#[version_history(sensitve = ["password_digest"])]
pub trait PostRepository {}
},
)
.to_string();
assert!(
generated.contains("compile_error") || generated.contains("unknown version_history"),
"typo in sensitive attribute must produce a compile error, not silently succeed: {generated}"
);
assert!(
!generated.contains("password_digest"),
"typo must not cause sensitive column to be silently omitted: {generated}"
);
}
#[test]
fn repository_macro_no_versioned_record_impl_suppresses_generated_impl() {
let generated = repository_macro(
quote! { Post, versioned = true, no_versioned_record_impl },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
!generated.contains("version_table_name"),
"no_versioned_record_impl must suppress the generated impl block: {generated}"
);
assert!(
generated.contains("version_history"),
"version_history query method must still be generated: {generated}"
);
}
#[test]
fn repository_macro_no_versioned_record_impl_uses_trait_hooks_for_history_writes() {
let generated = repository_macro(
quote! { Post, versioned = true, no_versioned_record_impl },
quote! { pub trait PostRepository {} },
)
.to_string();
assert!(
generated.contains("version_column_values"),
"history writes must call VersionedRecord::version_column_values(): {generated}"
);
assert!(
generated.contains("version_record_id"),
"history writes must call VersionedRecord::version_record_id(): {generated}"
);
}
#[test]
fn repository_macro_non_versioned_does_not_emit_versioned_record_impl() {
let generated =
repository_macro(quote! { Post }, quote! { pub trait PostRepository {} }).to_string();
assert!(
!generated.contains("VersionedRecord"),
"non-versioned repository must not generate VersionedRecord impl: {generated}"
);
}
#[test]
fn repository_macro_non_versioned_does_not_regress() {
let generated =
repository_macro(quote! { Post }, quote! { pub trait PostRepository {} }).to_string();
assert!(
generated.contains("find_by_id"),
"non-versioned repository must still generate find_by_id"
);
assert!(
generated.contains("save"),
"non-versioned repository must still generate save"
);
}
}