es_entity/operation/hooks.rs
1//! Commit hooks for executing custom logic before and after transaction commits.
2//!
3//! This module provides the [`CommitHook`] trait and supporting types that allow you to
4//! register hooks that execute during the commit lifecycle of a transaction. This is useful for:
5//!
6//! - Publishing events to message queues after successful commits
7//! - Updating caches
8//! - Triggering side effects that should only occur if the transaction succeeds
9//! - Accumulating operations across multiple entity updates in a transaction
10//!
11//! # Hook Lifecycle
12//!
13//! 1. **Registration**: Hooks are registered using [`AtomicOperation::add_commit_hook()`]
14//! 2. **Merging**: Multiple hooks of the same type may be merged via [`CommitHook::merge()`]
15//! 3. **Pre-commit**: [`CommitHook::pre_commit()`] executes before the transaction commits
16//! 4. **Commit**: The underlying database transaction is committed
17//! 5. **Post-commit**: [`CommitHook::post_commit()`] executes after successful commit
18//!
19//! # Examples
20//!
21//! ## Hook with Database Operations and Channel-Based Publishing
22//!
23//! This example shows a complete event publishing hook that:
24//! - Stores events in the database during pre-commit (within the transaction)
25//! - Sends events to a channel during post-commit for async processing
26//! - Merges multiple hook instances to batch operations
27//!
28//! Note: `post_commit()` is synchronous and cannot fail, so it's best used for
29//! fire-and-forget operations like sending to channels. A background task can then
30//! handle the async work of publishing to external systems.
31//!
32//! ```
33//! use es_entity::{AtomicOperation, operation::hooks::{CommitHook, HookOperation, PreCommitRet}};
34//!
35//! #[derive(Debug, Clone)]
36//! struct Event {
37//! entity_id: uuid::Uuid,
38//! event_type: String,
39//! }
40//!
41//! #[derive(Debug)]
42//! struct EventPublisher {
43//! events: Vec<Event>,
44//! // Channel sender for publishing events to a background processor
45//! // In production, this might be tokio::sync::mpsc::Sender or similar
46//! tx: std::sync::mpsc::Sender<Event>,
47//! }
48//!
49//! impl CommitHook for EventPublisher {
50//! async fn pre_commit(self, mut op: HookOperation<'_>)
51//! -> Result<PreCommitRet<'_, Self>, sqlx::Error>
52//! {
53//! // Store events in the database within the transaction
54//! // If the transaction fails, these inserts will be rolled back
55//! for event in &self.events {
56//! sqlx::query!(
57//! "INSERT INTO hook_events (entity_id, event_type, created_at) VALUES ($1, $2, NOW())",
58//! event.entity_id,
59//! event.event_type
60//! )
61//! .execute(op.as_executor())
62//! .await?;
63//! }
64//!
65//! PreCommitRet::ok(self, op)
66//! }
67//!
68//! fn post_commit(self) {
69//! // Send events to a channel for async processing
70//! // This only runs if the transaction succeeded
71//! // Channel sends are fast and don't block; a background task handles publishing
72//! for event in self.events {
73//! // In production, handle send failures appropriately (logging, metrics, etc.)
74//! // The channel might be bounded to apply backpressure
75//! let _ = self.tx.send(event);
76//! }
77//! }
78//!
79//! fn merge(&mut self, other: &mut Self) -> bool {
80//! // Merge multiple EventPublisher hooks into one to batch operations
81//! self.events.append(&mut other.events);
82//! true
83//! }
84//! }
85//!
86//! // Separate background task for async event publishing
87//! // async fn event_publisher_task(mut rx: tokio::sync::mpsc::Receiver<Event>) {
88//! // while let Some(event) = rx.recv().await {
89//! // // Publish to Kafka, RabbitMQ, webhooks, etc.
90//! // // Handle failures with retries, dead-letter queues, etc.
91//! // match publish_to_external_system(&event).await {
92//! // Ok(_) => log::info!("Published event: {:?}", event),
93//! // Err(e) => log::error!("Failed to publish event: {:?}", e),
94//! // }
95//! // }
96//! // }
97//! ```
98//!
99//! ## Usage
100//!
101//! ```no_run
102//! # use es_entity::{AtomicOperation, DbOp, operation::hooks::{CommitHook, HookOperation, PreCommitRet}};
103//! # use es_entity::db;
104//! # #[derive(Debug, Clone)]
105//! # struct Event { entity_id: uuid::Uuid, event_type: String }
106//! # #[derive(Debug)]
107//! # struct EventPublisher { events: Vec<Event>, tx: std::sync::mpsc::Sender<Event> }
108//! # impl CommitHook for EventPublisher {
109//! # async fn pre_commit(self, mut op: HookOperation<'_>) -> Result<PreCommitRet<'_, Self>, sqlx::Error> {
110//! # for event in &self.events {
111//! # sqlx::query!(
112//! # "INSERT INTO hook_events (entity_id, event_type, created_at) VALUES ($1, $2, NOW())",
113//! # event.entity_id, event.event_type
114//! # ).execute(op.as_executor()).await?;
115//! # }
116//! # PreCommitRet::ok(self, op)
117//! # }
118//! # fn post_commit(self) { for event in self.events { let _ = self.tx.send(event); } }
119//! # fn merge(&mut self, other: &mut Self) -> bool { self.events.append(&mut other.events); true }
120//! # }
121//! # async fn example(pool: db::Pool) -> Result<(), sqlx::Error> {
122//! let user_id = uuid::Uuid::nil();
123//! let (tx, _rx) = std::sync::mpsc::channel();
124//! let mut op = DbOp::init(&pool).await?;
125//!
126//! // Add first hook
127//! op.add_commit_hook(EventPublisher {
128//! events: vec![Event { entity_id: user_id, event_type: "user.created".to_string() }],
129//! tx: tx.clone(),
130//! }).expect("could not add hook");
131//!
132//! // Add second hook - will merge with the first
133//! op.add_commit_hook(EventPublisher {
134//! events: vec![Event { entity_id: user_id, event_type: "email.sent".to_string() }],
135//! tx: tx.clone(),
136//! }).expect("could not add hook");
137//!
138//! // Both hooks merge into one, events are stored in DB, then sent to channel
139//! op.commit().await?;
140//! # Ok(())
141//! # }
142//! ```
143
144use std::{
145 any::{Any, TypeId},
146 collections::HashMap,
147 future::Future,
148 pin::Pin,
149};
150
151use crate::db;
152
153use super::AtomicOperation;
154
155/// Type alias for boxed async futures.
156pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
157
158/// Trait for implementing custom commit hooks that execute before and after transaction commits.
159///
160/// Hooks execute in order: [`pre_commit()`](Self::pre_commit) → database commit → [`post_commit()`](Self::post_commit).
161/// Multiple hooks of the same type can be merged via [`merge()`](Self::merge).
162///
163/// See module-level documentation for a complete example.
164pub trait CommitHook: Send + 'static + Sized {
165 /// Called before the transaction commits. Can perform database operations.
166 ///
167 /// Errors returned here will roll back the transaction.
168 fn pre_commit(
169 self,
170 op: HookOperation<'_>,
171 ) -> impl Future<Output = Result<PreCommitRet<'_, Self>, sqlx::Error>> + Send {
172 async { PreCommitRet::ok(self, op) }
173 }
174
175 /// Called after successful commit. Cannot fail, not async.
176 fn post_commit(self) {
177 // Default: do nothing
178 }
179
180 /// Try to merge another hook of the same type into this one.
181 ///
182 /// Returns `true` if merged (other will be dropped), `false` if not (both execute separately).
183 fn merge(&mut self, _other: &mut Self) -> bool {
184 false
185 }
186
187 /// Execute the hook immediately, bypassing the hook system.
188 ///
189 /// Useful when [`AtomicOperation::add_commit_hook()`] returns `Err(hook)`.
190 fn force_execute_pre_commit(
191 self,
192 op: &mut impl AtomicOperation,
193 ) -> impl Future<Output = Result<Self, sqlx::Error>> + Send {
194 async {
195 let hook_op = HookOperation::new(op);
196 Ok(self.pre_commit(hook_op).await?.hook)
197 }
198 }
199}
200
201/// Wrapper around a database connection passed to [`CommitHook::pre_commit()`].
202///
203/// Implements [`AtomicOperation`] to allow executing database queries within the transaction.
204pub struct HookOperation<'c> {
205 now: Option<chrono::DateTime<chrono::Utc>>,
206 conn: &'c mut db::Connection,
207}
208
209impl<'c> HookOperation<'c> {
210 fn new(op: &'c mut impl AtomicOperation) -> Self {
211 Self {
212 now: op.maybe_now(),
213 conn: op.as_executor(),
214 }
215 }
216}
217
218impl<'c> AtomicOperation for HookOperation<'c> {
219 fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
220 self.now
221 }
222
223 fn as_executor(&mut self) -> &mut db::Connection {
224 self.conn
225 }
226}
227
228/// Return type for [`CommitHook::pre_commit()`].
229///
230/// Use [`PreCommitRet::ok()`] to construct: `PreCommitRet::ok(self, op)`.
231pub struct PreCommitRet<'c, H> {
232 op: HookOperation<'c>,
233 hook: H,
234}
235
236impl<'c, H> PreCommitRet<'c, H> {
237 /// Creates a successful pre-commit result.
238 pub fn ok(hook: H, op: HookOperation<'c>) -> Result<Self, sqlx::Error> {
239 Ok(Self { op, hook })
240 }
241}
242
243// --- Object-safe internal trait ---
244trait DynHook: Send {
245 #[allow(clippy::type_complexity)]
246 fn pre_commit_boxed<'c>(
247 self: Box<Self>,
248 op: HookOperation<'c>,
249 ) -> BoxFuture<'c, Result<(HookOperation<'c>, Box<dyn DynHook>), sqlx::Error>>;
250
251 fn post_commit_boxed(self: Box<Self>);
252
253 fn try_merge(&mut self, other: &mut dyn DynHook) -> bool;
254
255 fn as_any_mut(&mut self) -> &mut dyn Any;
256}
257
258impl<H: CommitHook> DynHook for H {
259 fn pre_commit_boxed<'c>(
260 self: Box<Self>,
261 op: HookOperation<'c>,
262 ) -> BoxFuture<'c, Result<(HookOperation<'c>, Box<dyn DynHook>), sqlx::Error>> {
263 Box::pin(async move {
264 let ret = self.pre_commit(op).await?;
265 Ok((ret.op, Box::new(ret.hook) as Box<dyn DynHook>))
266 })
267 }
268
269 fn post_commit_boxed(self: Box<Self>) {
270 (*self).post_commit()
271 }
272
273 fn try_merge(&mut self, other: &mut dyn DynHook) -> bool {
274 let other_h = other
275 .as_any_mut()
276 .downcast_mut::<H>()
277 .expect("hook type mismatch");
278 self.merge(other_h)
279 }
280
281 fn as_any_mut(&mut self) -> &mut dyn Any {
282 self
283 }
284}
285
286pub(crate) struct CommitHooks {
287 hooks: HashMap<TypeId, Vec<Box<dyn DynHook>>>,
288}
289
290impl CommitHooks {
291 pub fn new() -> Self {
292 Self {
293 hooks: HashMap::new(),
294 }
295 }
296
297 pub(super) fn add<H: CommitHook>(&mut self, hook: H) {
298 let type_id = TypeId::of::<H>();
299 let hooks_vec = self.hooks.entry(type_id).or_default();
300
301 let mut new_hook: Box<dyn DynHook> = Box::new(hook);
302
303 if let Some(last) = hooks_vec.last_mut()
304 && last.try_merge(new_hook.as_mut())
305 {
306 return;
307 }
308
309 hooks_vec.push(new_hook);
310 }
311
312 pub(super) async fn execute_pre(
313 mut self,
314 op: &mut impl AtomicOperation,
315 ) -> Result<PostCommitHooks, sqlx::Error> {
316 let mut op = HookOperation::new(op);
317 let mut post_hooks = Vec::new();
318
319 for (_, hooks_vec) in self.hooks.drain() {
320 for hook in hooks_vec {
321 let (new_op, hook) = hook.pre_commit_boxed(op).await?;
322 op = new_op;
323 post_hooks.push(hook);
324 }
325 }
326
327 Ok(PostCommitHooks { hooks: post_hooks })
328 }
329}
330
331impl Default for CommitHooks {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337pub struct PostCommitHooks {
338 hooks: Vec<Box<dyn DynHook>>,
339}
340
341impl PostCommitHooks {
342 pub(super) fn execute(self) {
343 for hook in self.hooks {
344 hook.post_commit_boxed();
345 }
346 }
347}