use proc_macro2::TokenStream;
use quote::{format_ident, quote};
use super::context::Context;
impl Context<'_> {
fn executor_tokens() -> TokenStream {
quote! { &mut *tx }
}
pub fn notify_created(&self) -> TokenStream {
if !self.streams {
return TokenStream::new();
}
let entity_name = self.entity_name;
let event_name = format_ident!("{}Event", entity_name);
let executor = Self::executor_tokens();
quote! {
let __event = #event_name::created(entity.clone());
let __payload = ::serde_json::to_string(&__event)
.expect("event serialization should not fail");
::sqlx::query("SELECT pg_notify($1, $2)")
.bind(#entity_name::CHANNEL)
.bind(&__payload)
.execute(#executor)
.await?;
}
}
pub fn notify_updated(&self) -> TokenStream {
if !self.streams {
return TokenStream::new();
}
let entity_name = self.entity_name;
let event_name = format_ident!("{}Event", entity_name);
let executor = Self::executor_tokens();
quote! {
let __event = #event_name::updated(__old_entity, entity.clone());
let __payload = ::serde_json::to_string(&__event)
.expect("event serialization should not fail");
::sqlx::query("SELECT pg_notify($1, $2)")
.bind(#entity_name::CHANNEL)
.bind(&__payload)
.execute(#executor)
.await?;
}
}
pub fn notify_hard_deleted(&self) -> TokenStream {
if !self.streams {
return TokenStream::new();
}
let entity_name = self.entity_name;
let event_name = format_ident!("{}Event", entity_name);
let executor = Self::executor_tokens();
quote! {
let __event = #event_name::hard_deleted(id.clone());
let __payload = ::serde_json::to_string(&__event)
.expect("event serialization should not fail");
::sqlx::query("SELECT pg_notify($1, $2)")
.bind(#entity_name::CHANNEL)
.bind(&__payload)
.execute(#executor)
.await?;
}
}
pub fn notify_soft_deleted(&self) -> TokenStream {
if !self.streams {
return TokenStream::new();
}
let entity_name = self.entity_name;
let event_name = format_ident!("{}Event", entity_name);
let executor = Self::executor_tokens();
quote! {
let __event = #event_name::SoftDeleted { id: id.clone() };
let __payload = ::serde_json::to_string(&__event)
.expect("event serialization should not fail");
::sqlx::query("SELECT pg_notify($1, $2)")
.bind(#entity_name::CHANNEL)
.bind(&__payload)
.execute(#executor)
.await?;
}
}
pub fn fetch_old_for_update(&self) -> TokenStream {
if !self.streams {
return TokenStream::new();
}
let entity_name = self.entity_name;
let row_name = &self.row_name;
let columns_str = &self.columns_str;
let table = &self.table;
let id_name = self.id_name;
let placeholder = self.dialect.placeholder(1);
let select_sql = format!(
"SELECT {columns_str} FROM {table} WHERE {id_name} = {placeholder} FOR UPDATE"
);
quote! {
let __old_row: #row_name = ::sqlx::query_as(#select_sql)
.bind(&id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| ::sqlx::Error::RowNotFound)?;
let __old_entity = #entity_name::from(__old_row);
}
}
}