es_entity/operation/hooks.rs
1//! Commit hooks for executing custom logic before and after transaction commits.
2//!
3//! This module provides the [`CommitHook`] trait and supporting types that allow you to
4//! register hooks that execute during the commit lifecycle of a transaction. This is useful for:
5//!
6//! - Publishing events to message queues after successful commits
7//! - Updating caches
8//! - Triggering side effects that should only occur if the transaction succeeds
9//! - Accumulating operations across multiple entity updates in a transaction
10//!
11//! # Hook Lifecycle
12//!
13//! 1. **Registration**: Hooks are registered using [`AtomicOperation::add_commit_hook()`]
14//! 2. **Merging**: Multiple hooks of the same type may be merged via [`CommitHook::merge()`]
15//! 3. **Pre-commit**: [`CommitHook::pre_commit()`] executes before the transaction commits
16//! 4. **Commit**: The underlying database transaction is committed
17//! 5. **Post-commit**: [`CommitHook::post_commit()`] executes after successful commit
18//!
19//! # Examples
20//!
21//! ## Hook with Database Operations and Channel-Based Publishing
22//!
23//! This example shows a complete event publishing hook that:
24//! - Stores events in the database during pre-commit (within the transaction)
25//! - Sends events to a channel during post-commit for async processing
26//! - Merges multiple hook instances to batch operations
27//!
28//! Note: `post_commit()` is synchronous and cannot fail, so it's best used for
29//! fire-and-forget operations like sending to channels. A background task can then
30//! handle the async work of publishing to external systems.
31//!
32//! ```
33//! use es_entity::{AtomicOperation, operation::hooks::{CommitHook, HookOperation, PreCommitRet}};
34//!
35//! #[derive(Debug, Clone)]
36//! struct Event {
37//! entity_id: uuid::Uuid,
38//! event_type: String,
39//! }
40//!
41//! #[derive(Debug)]
42//! struct EventPublisher {
43//! events: Vec<Event>,
44//! // Channel sender for publishing events to a background processor
45//! // In production, this might be tokio::sync::mpsc::Sender or similar
46//! tx: std::sync::mpsc::Sender<Event>,
47//! }
48//!
49//! impl CommitHook for EventPublisher {
50//! async fn pre_commit(self, mut op: HookOperation<'_>)
51//! -> Result<PreCommitRet<'_, Self>, sqlx::Error>
52//! {
53//! // Store events in the database within the transaction
54//! // If the transaction fails, these inserts will be rolled back
55//! for event in &self.events {
56//! sqlx::query!(
57//! "INSERT INTO hook_events (entity_id, event_type, created_at) VALUES ($1, $2, NOW())",
58//! event.entity_id,
59//! event.event_type
60//! )
61//! .execute(op.as_executor())
62//! .await?;
63//! }
64//!
65//! PreCommitRet::ok(self, op)
66//! }
67//!
68//! fn post_commit(self) {
69//! // Send events to a channel for async processing
70//! // This only runs if the transaction succeeded
71//! // Channel sends are fast and don't block; a background task handles publishing
72//! for event in self.events {
73//! // In production, handle send failures appropriately (logging, metrics, etc.)
74//! // The channel might be bounded to apply backpressure
75//! let _ = self.tx.send(event);
76//! }
77//! }
78//!
79//! fn merge(&mut self, other: &mut Self) -> bool {
80//! // Merge multiple EventPublisher hooks into one to batch operations
81//! self.events.append(&mut other.events);
82//! true
83//! }
84//! }
85//!
86//! // Separate background task for async event publishing
87//! // async fn event_publisher_task(mut rx: tokio::sync::mpsc::Receiver<Event>) {
88//! // while let Some(event) = rx.recv().await {
89//! // // Publish to Kafka, RabbitMQ, webhooks, etc.
90//! // // Handle failures with retries, dead-letter queues, etc.
91//! // match publish_to_external_system(&event).await {
92//! // Ok(_) => log::info!("Published event: {:?}", event),
93//! // Err(e) => log::error!("Failed to publish event: {:?}", e),
94//! // }
95//! // }
96//! // }
97//! ```
98//!
99//! ## Usage
100//!
101//! ```no_run
102//! # use es_entity::{AtomicOperation, DbOp, operation::hooks::{CommitHook, HookOperation, PreCommitRet}};
103//! # use sqlx::PgPool;
104//! # #[derive(Debug, Clone)]
105//! # struct Event { entity_id: uuid::Uuid, event_type: String }
106//! # #[derive(Debug)]
107//! # struct EventPublisher { events: Vec<Event>, tx: std::sync::mpsc::Sender<Event> }
108//! # impl CommitHook for EventPublisher {
109//! # async fn pre_commit(self, mut op: HookOperation<'_>) -> Result<PreCommitRet<'_, Self>, sqlx::Error> {
110//! # for event in &self.events {
111//! # sqlx::query!(
112//! # "INSERT INTO hook_events (entity_id, event_type, created_at) VALUES ($1, $2, NOW())",
113//! # event.entity_id, event.event_type
114//! # ).execute(op.as_executor()).await?;
115//! # }
116//! # PreCommitRet::ok(self, op)
117//! # }
118//! # fn post_commit(self) { for event in self.events { let _ = self.tx.send(event); } }
119//! # fn merge(&mut self, other: &mut Self) -> bool { self.events.append(&mut other.events); true }
120//! # }
121//! # async fn example(pool: PgPool) -> Result<(), sqlx::Error> {
122//! let user_id = uuid::Uuid::nil();
123//! let (tx, _rx) = std::sync::mpsc::channel();
124//! let mut op = DbOp::init(&pool).await?;
125//!
126//! // Add first hook
127//! op.add_commit_hook(EventPublisher {
128//! events: vec![Event { entity_id: user_id, event_type: "user.created".to_string() }],
129//! tx: tx.clone(),
130//! }).expect("could not add hook");
131//!
132//! // Add second hook - will merge with the first
133//! op.add_commit_hook(EventPublisher {
134//! events: vec![Event { entity_id: user_id, event_type: "email.sent".to_string() }],
135//! tx: tx.clone(),
136//! }).expect("could not add hook");
137//!
138//! // Both hooks merge into one, events are stored in DB, then sent to channel
139//! op.commit().await?;
140//! # Ok(())
141//! # }
142//! ```
143
144use std::{
145 any::{Any, TypeId},
146 collections::HashMap,
147 future::Future,
148 pin::Pin,
149};
150
151use super::AtomicOperation;
152
153/// Type alias for boxed async futures.
154pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
155
156/// Trait for implementing custom commit hooks that execute before and after transaction commits.
157///
158/// Hooks execute in order: [`pre_commit()`](Self::pre_commit) → database commit → [`post_commit()`](Self::post_commit).
159/// Multiple hooks of the same type can be merged via [`merge()`](Self::merge).
160///
161/// See module-level documentation for a complete example.
162pub trait CommitHook: Send + 'static + Sized {
163 /// Called before the transaction commits. Can perform database operations.
164 ///
165 /// Errors returned here will roll back the transaction.
166 fn pre_commit(
167 self,
168 op: HookOperation<'_>,
169 ) -> impl Future<Output = Result<PreCommitRet<'_, Self>, sqlx::Error>> + Send {
170 async { PreCommitRet::ok(self, op) }
171 }
172
173 /// Called after successful commit. Cannot fail, not async.
174 fn post_commit(self) {
175 // Default: do nothing
176 }
177
178 /// Try to merge another hook of the same type into this one.
179 ///
180 /// Returns `true` if merged (other will be dropped), `false` if not (both execute separately).
181 fn merge(&mut self, _other: &mut Self) -> bool {
182 false
183 }
184
185 /// Execute the hook immediately, bypassing the hook system.
186 ///
187 /// Useful when [`AtomicOperation::add_commit_hook()`] returns `Err(hook)`.
188 fn force_execute_pre_commit(
189 self,
190 op: &mut impl AtomicOperation,
191 ) -> impl Future<Output = Result<Self, sqlx::Error>> + Send {
192 async {
193 let hook_op = HookOperation::new(op);
194 Ok(self.pre_commit(hook_op).await?.hook)
195 }
196 }
197}
198
199/// Wrapper around a database connection passed to [`CommitHook::pre_commit()`].
200///
201/// Implements [`AtomicOperation`] to allow executing database queries within the transaction.
202pub struct HookOperation<'c> {
203 now: Option<chrono::DateTime<chrono::Utc>>,
204 conn: &'c mut sqlx::PgConnection,
205}
206
207impl<'c> HookOperation<'c> {
208 fn new(op: &'c mut impl AtomicOperation) -> Self {
209 Self {
210 now: op.maybe_now(),
211 conn: op.as_executor(),
212 }
213 }
214}
215
216impl<'c> AtomicOperation for HookOperation<'c> {
217 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
218 self.now
219 }
220
221 fn as_executor(&mut self) -> &mut sqlx::PgConnection {
222 self.conn
223 }
224}
225
226/// Return type for [`CommitHook::pre_commit()`].
227///
228/// Use [`PreCommitRet::ok()`] to construct: `PreCommitRet::ok(self, op)`.
229pub struct PreCommitRet<'c, H> {
230 op: HookOperation<'c>,
231 hook: H,
232}
233
234impl<'c, H> PreCommitRet<'c, H> {
235 /// Creates a successful pre-commit result.
236 pub fn ok(hook: H, op: HookOperation<'c>) -> Result<Self, sqlx::Error> {
237 Ok(Self { op, hook })
238 }
239}
240
241// --- Object-safe internal trait ---
242trait DynHook: Send {
243 #[allow(clippy::type_complexity)]
244 fn pre_commit_boxed<'c>(
245 self: Box<Self>,
246 op: HookOperation<'c>,
247 ) -> BoxFuture<'c, Result<(HookOperation<'c>, Box<dyn DynHook>), sqlx::Error>>;
248
249 fn post_commit_boxed(self: Box<Self>);
250
251 fn try_merge(&mut self, other: &mut dyn DynHook) -> bool;
252
253 fn as_any_mut(&mut self) -> &mut dyn Any;
254}
255
256impl<H: CommitHook> DynHook for H {
257 fn pre_commit_boxed<'c>(
258 self: Box<Self>,
259 op: HookOperation<'c>,
260 ) -> BoxFuture<'c, Result<(HookOperation<'c>, Box<dyn DynHook>), sqlx::Error>> {
261 Box::pin(async move {
262 let ret = self.pre_commit(op).await?;
263 Ok((ret.op, Box::new(ret.hook) as Box<dyn DynHook>))
264 })
265 }
266
267 fn post_commit_boxed(self: Box<Self>) {
268 (*self).post_commit()
269 }
270
271 fn try_merge(&mut self, other: &mut dyn DynHook) -> bool {
272 let other_h = other
273 .as_any_mut()
274 .downcast_mut::<H>()
275 .expect("hook type mismatch");
276 self.merge(other_h)
277 }
278
279 fn as_any_mut(&mut self) -> &mut dyn Any {
280 self
281 }
282}
283
284pub(crate) struct CommitHooks {
285 hooks: HashMap<TypeId, Vec<Box<dyn DynHook>>>,
286}
287
288impl CommitHooks {
289 pub fn new() -> Self {
290 Self {
291 hooks: HashMap::new(),
292 }
293 }
294
295 pub(super) fn add<H: CommitHook>(&mut self, hook: H) {
296 let type_id = TypeId::of::<H>();
297 let hooks_vec = self.hooks.entry(type_id).or_default();
298
299 let mut new_hook: Box<dyn DynHook> = Box::new(hook);
300
301 if let Some(last) = hooks_vec.last_mut()
302 && last.try_merge(new_hook.as_mut())
303 {
304 return;
305 }
306
307 hooks_vec.push(new_hook);
308 }
309
310 pub(super) async fn execute_pre(
311 mut self,
312 op: &mut impl AtomicOperation,
313 ) -> Result<PostCommitHooks, sqlx::Error> {
314 let mut op = HookOperation::new(op);
315 let mut post_hooks = Vec::new();
316
317 for (_, hooks_vec) in self.hooks.drain() {
318 for hook in hooks_vec {
319 let (new_op, hook) = hook.pre_commit_boxed(op).await?;
320 op = new_op;
321 post_hooks.push(hook);
322 }
323 }
324
325 Ok(PostCommitHooks { hooks: post_hooks })
326 }
327}
328
329impl Default for CommitHooks {
330 fn default() -> Self {
331 Self::new()
332 }
333}
334
335pub struct PostCommitHooks {
336 hooks: Vec<Box<dyn DynHook>>,
337}
338
339impl PostCommitHooks {
340 pub(super) fn execute(self) {
341 for hook in self.hooks {
342 hook.post_commit_boxed();
343 }
344 }
345}