es_entity/operation/
hooks.rs

1//! Commit hooks for executing custom logic before and after transaction commits.
2//!
3//! This module provides the [`CommitHook`] trait and supporting types that allow you to
4//! register hooks that execute during the commit lifecycle of a transaction. This is useful for:
5//!
6//! - Publishing events to message queues after successful commits
7//! - Updating caches
8//! - Triggering side effects that should only occur if the transaction succeeds
9//! - Accumulating operations across multiple entity updates in a transaction
10//!
11//! # Hook Lifecycle
12//!
13//! 1. **Registration**: Hooks are registered using [`AtomicOperation::add_commit_hook()`]
14//! 2. **Merging**: Multiple hooks of the same type may be merged via [`CommitHook::merge()`]
15//! 3. **Pre-commit**: [`CommitHook::pre_commit()`] executes before the transaction commits
16//! 4. **Commit**: The underlying database transaction is committed
17//! 5. **Post-commit**: [`CommitHook::post_commit()`] executes after successful commit
18//!
19//! # Examples
20//!
21//! ## Hook with Database Operations and Channel-Based Publishing
22//!
23//! This example shows a complete event publishing hook that:
24//! - Stores events in the database during pre-commit (within the transaction)
25//! - Sends events to a channel during post-commit for async processing
26//! - Merges multiple hook instances to batch operations
27//!
28//! Note: `post_commit()` is synchronous and cannot fail, so it's best used for
29//! fire-and-forget operations like sending to channels. A background task can then
30//! handle the async work of publishing to external systems.
31//!
32//! ```
33//! use es_entity::{AtomicOperation, operation::hooks::{CommitHook, HookOperation, PreCommitRet}};
34//!
35//! #[derive(Debug, Clone)]
36//! struct Event {
37//!     entity_id: uuid::Uuid,
38//!     event_type: String,
39//! }
40//!
41//! #[derive(Debug)]
42//! struct EventPublisher {
43//!     events: Vec<Event>,
44//!     // Channel sender for publishing events to a background processor
45//!     // In production, this might be tokio::sync::mpsc::Sender or similar
46//!     tx: std::sync::mpsc::Sender<Event>,
47//! }
48//!
49//! impl CommitHook for EventPublisher {
50//!     async fn pre_commit(self, mut op: HookOperation<'_>)
51//!         -> Result<PreCommitRet<'_, Self>, sqlx::Error>
52//!     {
53//!         // Store events in the database within the transaction
54//!         // If the transaction fails, these inserts will be rolled back
55//!         for event in &self.events {
56//!             sqlx::query!(
57//!                 "INSERT INTO hook_events (entity_id, event_type, created_at) VALUES ($1, $2, NOW())",
58//!                 event.entity_id,
59//!                 event.event_type
60//!             )
61//!             .execute(op.as_executor())
62//!             .await?;
63//!         }
64//!
65//!         PreCommitRet::ok(self, op)
66//!     }
67//!
68//!     fn post_commit(self) {
69//!         // Send events to a channel for async processing
70//!         // This only runs if the transaction succeeded
71//!         // Channel sends are fast and don't block; a background task handles publishing
72//!         for event in self.events {
73//!             // In production, handle send failures appropriately (logging, metrics, etc.)
74//!             // The channel might be bounded to apply backpressure
75//!             let _ = self.tx.send(event);
76//!         }
77//!     }
78//!
79//!     fn merge(&mut self, other: &mut Self) -> bool {
80//!         // Merge multiple EventPublisher hooks into one to batch operations
81//!         self.events.append(&mut other.events);
82//!         true
83//!     }
84//! }
85//!
86//! // Separate background task for async event publishing
87//! // async fn event_publisher_task(mut rx: tokio::sync::mpsc::Receiver<Event>) {
88//! //     while let Some(event) = rx.recv().await {
89//! //         // Publish to Kafka, RabbitMQ, webhooks, etc.
90//! //         // Handle failures with retries, dead-letter queues, etc.
91//! //         match publish_to_external_system(&event).await {
92//! //             Ok(_) => log::info!("Published event: {:?}", event),
93//! //             Err(e) => log::error!("Failed to publish event: {:?}", e),
94//! //         }
95//! //     }
96//! // }
97//! ```
98//!
99//! ## Usage
100//!
101//! ```no_run
102//! # use es_entity::{AtomicOperation, DbOp, operation::hooks::{CommitHook, HookOperation, PreCommitRet}};
103//! # use sqlx::PgPool;
104//! # #[derive(Debug, Clone)]
105//! # struct Event { entity_id: uuid::Uuid, event_type: String }
106//! # #[derive(Debug)]
107//! # struct EventPublisher { events: Vec<Event>, tx: std::sync::mpsc::Sender<Event> }
108//! # impl CommitHook for EventPublisher {
109//! #     async fn pre_commit(self, mut op: HookOperation<'_>) -> Result<PreCommitRet<'_, Self>, sqlx::Error> {
110//! #         for event in &self.events {
111//! #             sqlx::query!(
112//! #                 "INSERT INTO hook_events (entity_id, event_type, created_at) VALUES ($1, $2, NOW())",
113//! #                 event.entity_id, event.event_type
114//! #             ).execute(op.as_executor()).await?;
115//! #         }
116//! #         PreCommitRet::ok(self, op)
117//! #     }
118//! #     fn post_commit(self) { for event in self.events { let _ = self.tx.send(event); } }
119//! #     fn merge(&mut self, other: &mut Self) -> bool { self.events.append(&mut other.events); true }
120//! # }
121//! # async fn example(pool: PgPool) -> Result<(), sqlx::Error> {
122//! let user_id = uuid::Uuid::nil();
123//! let (tx, _rx) = std::sync::mpsc::channel();
124//! let mut op = DbOp::init(&pool).await?;
125//!
126//! // Add first hook
127//! op.add_commit_hook(EventPublisher {
128//!     events: vec![Event { entity_id: user_id, event_type: "user.created".to_string() }],
129//!     tx: tx.clone(),
130//! }).expect("could not add hook");
131//!
132//! // Add second hook - will merge with the first
133//! op.add_commit_hook(EventPublisher {
134//!     events: vec![Event { entity_id: user_id, event_type: "email.sent".to_string() }],
135//!     tx: tx.clone(),
136//! }).expect("could not add hook");
137//!
138//! // Both hooks merge into one, events are stored in DB, then sent to channel
139//! op.commit().await?;
140//! # Ok(())
141//! # }
142//! ```
143
144use std::{
145    any::{Any, TypeId},
146    collections::HashMap,
147    future::Future,
148    pin::Pin,
149};
150
151use super::AtomicOperation;
152
153/// Type alias for boxed async futures.
154pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
155
156/// Trait for implementing custom commit hooks that execute before and after transaction commits.
157///
158/// Hooks execute in order: [`pre_commit()`](Self::pre_commit) → database commit → [`post_commit()`](Self::post_commit).
159/// Multiple hooks of the same type can be merged via [`merge()`](Self::merge).
160///
161/// See module-level documentation for a complete example.
162pub trait CommitHook: Send + 'static + Sized {
163    /// Called before the transaction commits. Can perform database operations.
164    ///
165    /// Errors returned here will roll back the transaction.
166    fn pre_commit(
167        self,
168        op: HookOperation<'_>,
169    ) -> impl Future<Output = Result<PreCommitRet<'_, Self>, sqlx::Error>> + Send {
170        async { PreCommitRet::ok(self, op) }
171    }
172
173    /// Called after successful commit. Cannot fail, not async.
174    fn post_commit(self) {
175        // Default: do nothing
176    }
177
178    /// Try to merge another hook of the same type into this one.
179    ///
180    /// Returns `true` if merged (other will be dropped), `false` if not (both execute separately).
181    fn merge(&mut self, _other: &mut Self) -> bool {
182        false
183    }
184
185    /// Execute the hook immediately, bypassing the hook system.
186    ///
187    /// Useful when [`AtomicOperation::add_commit_hook()`] returns `Err(hook)`.
188    fn force_execute_pre_commit(
189        self,
190        op: &mut impl AtomicOperation,
191    ) -> impl Future<Output = Result<Self, sqlx::Error>> + Send {
192        async {
193            let hook_op = HookOperation::new(op);
194            Ok(self.pre_commit(hook_op).await?.hook)
195        }
196    }
197}
198
199/// Wrapper around a database connection passed to [`CommitHook::pre_commit()`].
200///
201/// Implements [`AtomicOperation`] to allow executing database queries within the transaction.
202pub struct HookOperation<'c> {
203    now: Option<chrono::DateTime<chrono::Utc>>,
204    conn: &'c mut sqlx::PgConnection,
205}
206
207impl<'c> HookOperation<'c> {
208    fn new(op: &'c mut impl AtomicOperation) -> Self {
209        Self {
210            now: op.maybe_now(),
211            conn: op.as_executor(),
212        }
213    }
214}
215
216impl<'c> AtomicOperation for HookOperation<'c> {
217    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
218        self.now
219    }
220
221    fn as_executor(&mut self) -> &mut sqlx::PgConnection {
222        self.conn
223    }
224}
225
226/// Return type for [`CommitHook::pre_commit()`].
227///
228/// Use [`PreCommitRet::ok()`] to construct: `PreCommitRet::ok(self, op)`.
229pub struct PreCommitRet<'c, H> {
230    op: HookOperation<'c>,
231    hook: H,
232}
233
234impl<'c, H> PreCommitRet<'c, H> {
235    /// Creates a successful pre-commit result.
236    pub fn ok(hook: H, op: HookOperation<'c>) -> Result<Self, sqlx::Error> {
237        Ok(Self { op, hook })
238    }
239}
240
241// --- Object-safe internal trait ---
242trait DynHook: Send {
243    #[allow(clippy::type_complexity)]
244    fn pre_commit_boxed<'c>(
245        self: Box<Self>,
246        op: HookOperation<'c>,
247    ) -> BoxFuture<'c, Result<(HookOperation<'c>, Box<dyn DynHook>), sqlx::Error>>;
248
249    fn post_commit_boxed(self: Box<Self>);
250
251    fn try_merge(&mut self, other: &mut dyn DynHook) -> bool;
252
253    fn as_any_mut(&mut self) -> &mut dyn Any;
254}
255
256impl<H: CommitHook> DynHook for H {
257    fn pre_commit_boxed<'c>(
258        self: Box<Self>,
259        op: HookOperation<'c>,
260    ) -> BoxFuture<'c, Result<(HookOperation<'c>, Box<dyn DynHook>), sqlx::Error>> {
261        Box::pin(async move {
262            let ret = self.pre_commit(op).await?;
263            Ok((ret.op, Box::new(ret.hook) as Box<dyn DynHook>))
264        })
265    }
266
267    fn post_commit_boxed(self: Box<Self>) {
268        (*self).post_commit()
269    }
270
271    fn try_merge(&mut self, other: &mut dyn DynHook) -> bool {
272        let other_h = other
273            .as_any_mut()
274            .downcast_mut::<H>()
275            .expect("hook type mismatch");
276        self.merge(other_h)
277    }
278
279    fn as_any_mut(&mut self) -> &mut dyn Any {
280        self
281    }
282}
283
284pub(crate) struct CommitHooks {
285    hooks: HashMap<TypeId, Vec<Box<dyn DynHook>>>,
286}
287
288impl CommitHooks {
289    pub fn new() -> Self {
290        Self {
291            hooks: HashMap::new(),
292        }
293    }
294
295    pub(super) fn add<H: CommitHook>(&mut self, hook: H) {
296        let type_id = TypeId::of::<H>();
297        let hooks_vec = self.hooks.entry(type_id).or_default();
298
299        let mut new_hook: Box<dyn DynHook> = Box::new(hook);
300
301        if let Some(last) = hooks_vec.last_mut()
302            && last.try_merge(new_hook.as_mut())
303        {
304            return;
305        }
306
307        hooks_vec.push(new_hook);
308    }
309
310    pub(super) async fn execute_pre(
311        mut self,
312        op: &mut impl AtomicOperation,
313    ) -> Result<PostCommitHooks, sqlx::Error> {
314        let mut op = HookOperation::new(op);
315        let mut post_hooks = Vec::new();
316
317        for (_, hooks_vec) in self.hooks.drain() {
318            for hook in hooks_vec {
319                let (new_op, hook) = hook.pre_commit_boxed(op).await?;
320                op = new_op;
321                post_hooks.push(hook);
322            }
323        }
324
325        Ok(PostCommitHooks { hooks: post_hooks })
326    }
327}
328
329impl Default for CommitHooks {
330    fn default() -> Self {
331        Self::new()
332    }
333}
334
335pub struct PostCommitHooks {
336    hooks: Vec<Box<dyn DynHook>>,
337}
338
339impl PostCommitHooks {
340    pub(super) fn execute(self) {
341        for hook in self.hooks {
342            hook.post_commit_boxed();
343        }
344    }
345}