Skip to main content

es_entity/operation/
hooks.rs

1//! Commit hooks for executing custom logic before and after transaction commits.
2//!
3//! This module provides the [`CommitHook`] trait and supporting types that allow you to
4//! register hooks that execute during the commit lifecycle of a transaction. This is useful for:
5//!
6//! - Publishing events to message queues after successful commits
7//! - Updating caches
8//! - Triggering side effects that should only occur if the transaction succeeds
9//! - Accumulating operations across multiple entity updates in a transaction
10//!
11//! # Hook Lifecycle
12//!
13//! 1. **Registration**: Hooks are registered using [`AtomicOperation::add_commit_hook()`]
14//! 2. **Merging**: Multiple hooks of the same type may be merged via [`CommitHook::merge()`]
15//! 3. **Pre-commit**: [`CommitHook::pre_commit()`] executes before the transaction commits
16//! 4. **Commit**: The underlying database transaction is committed
17//! 5. **Post-commit**: [`CommitHook::post_commit()`] executes after successful commit
18//!
19//! # Examples
20//!
21//! ## Hook with Database Operations and Channel-Based Publishing
22//!
23//! This example shows a complete event publishing hook that:
24//! - Stores events in the database during pre-commit (within the transaction)
25//! - Sends events to a channel during post-commit for async processing
26//! - Merges multiple hook instances to batch operations
27//!
28//! Note: `post_commit()` is synchronous and cannot fail, so it's best used for
29//! fire-and-forget operations like sending to channels. A background task can then
30//! handle the async work of publishing to external systems.
31//!
32//! ```
33//! use es_entity::{AtomicOperation, operation::hooks::{CommitHook, HookOperation, PreCommitRet}};
34//!
35//! #[derive(Debug, Clone)]
36//! struct Event {
37//!     entity_id: uuid::Uuid,
38//!     event_type: String,
39//! }
40//!
41//! #[derive(Debug)]
42//! struct EventPublisher {
43//!     events: Vec<Event>,
44//!     // Channel sender for publishing events to a background processor
45//!     // In production, this might be tokio::sync::mpsc::Sender or similar
46//!     tx: std::sync::mpsc::Sender<Event>,
47//! }
48//!
49//! impl CommitHook for EventPublisher {
50//!     async fn pre_commit(self, mut op: HookOperation<'_>)
51//!         -> Result<PreCommitRet<'_, Self>, sqlx::Error>
52//!     {
53//!         // Store events in the database within the transaction
54//!         // If the transaction fails, these inserts will be rolled back
55//!         for event in &self.events {
56//!             sqlx::query!(
57//!                 "INSERT INTO hook_events (entity_id, event_type, created_at) VALUES ($1, $2, NOW())",
58//!                 event.entity_id,
59//!                 event.event_type
60//!             )
61//!             .execute(op.as_executor())
62//!             .await?;
63//!         }
64//!
65//!         PreCommitRet::ok(self, op)
66//!     }
67//!
68//!     fn post_commit(self) {
69//!         // Send events to a channel for async processing
70//!         // This only runs if the transaction succeeded
71//!         // Channel sends are fast and don't block; a background task handles publishing
72//!         for event in self.events {
73//!             // In production, handle send failures appropriately (logging, metrics, etc.)
74//!             // The channel might be bounded to apply backpressure
75//!             let _ = self.tx.send(event);
76//!         }
77//!     }
78//!
79//!     fn merge(&mut self, other: &mut Self) -> bool {
80//!         // Merge multiple EventPublisher hooks into one to batch operations
81//!         self.events.append(&mut other.events);
82//!         true
83//!     }
84//! }
85//!
86//! // Separate background task for async event publishing
87//! // async fn event_publisher_task(mut rx: tokio::sync::mpsc::Receiver<Event>) {
88//! //     while let Some(event) = rx.recv().await {
89//! //         // Publish to Kafka, RabbitMQ, webhooks, etc.
90//! //         // Handle failures with retries, dead-letter queues, etc.
91//! //         match publish_to_external_system(&event).await {
92//! //             Ok(_) => log::info!("Published event: {:?}", event),
93//! //             Err(e) => log::error!("Failed to publish event: {:?}", e),
94//! //         }
95//! //     }
96//! // }
97//! ```
98//!
99//! ## Usage
100//!
101//! ```no_run
102//! # use es_entity::{AtomicOperation, DbOp, operation::hooks::{CommitHook, HookOperation, PreCommitRet}};
103//! # use es_entity::db;
104//! # #[derive(Debug, Clone)]
105//! # struct Event { entity_id: uuid::Uuid, event_type: String }
106//! # #[derive(Debug)]
107//! # struct EventPublisher { events: Vec<Event>, tx: std::sync::mpsc::Sender<Event> }
108//! # impl CommitHook for EventPublisher {
109//! #     async fn pre_commit(self, mut op: HookOperation<'_>) -> Result<PreCommitRet<'_, Self>, sqlx::Error> {
110//! #         for event in &self.events {
111//! #             sqlx::query!(
112//! #                 "INSERT INTO hook_events (entity_id, event_type, created_at) VALUES ($1, $2, NOW())",
113//! #                 event.entity_id, event.event_type
114//! #             ).execute(op.as_executor()).await?;
115//! #         }
116//! #         PreCommitRet::ok(self, op)
117//! #     }
118//! #     fn post_commit(self) { for event in self.events { let _ = self.tx.send(event); } }
119//! #     fn merge(&mut self, other: &mut Self) -> bool { self.events.append(&mut other.events); true }
120//! # }
121//! # async fn example(pool: db::Pool) -> Result<(), sqlx::Error> {
122//! let user_id = uuid::Uuid::nil();
123//! let (tx, _rx) = std::sync::mpsc::channel();
124//! let mut op = DbOp::init(&pool).await?;
125//!
126//! // Add first hook
127//! op.add_commit_hook(EventPublisher {
128//!     events: vec![Event { entity_id: user_id, event_type: "user.created".to_string() }],
129//!     tx: tx.clone(),
130//! }).expect("could not add hook");
131//!
132//! // Add second hook - will merge with the first
133//! op.add_commit_hook(EventPublisher {
134//!     events: vec![Event { entity_id: user_id, event_type: "email.sent".to_string() }],
135//!     tx: tx.clone(),
136//! }).expect("could not add hook");
137//!
138//! // Both hooks merge into one, events are stored in DB, then sent to channel
139//! op.commit().await?;
140//! # Ok(())
141//! # }
142//! ```
143
144use std::{
145    any::{Any, TypeId},
146    collections::HashMap,
147    future::Future,
148    pin::Pin,
149};
150
151use crate::db;
152
153use super::AtomicOperation;
154
155/// Type alias for boxed async futures.
156pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
157
158/// Trait for implementing custom commit hooks that execute before and after transaction commits.
159///
160/// Hooks execute in order: [`pre_commit()`](Self::pre_commit) → database commit → [`post_commit()`](Self::post_commit).
161/// Multiple hooks of the same type can be merged via [`merge()`](Self::merge).
162///
163/// See module-level documentation for a complete example.
164pub trait CommitHook: Send + 'static + Sized {
165    /// Called before the transaction commits. Can perform database operations.
166    ///
167    /// Errors returned here will roll back the transaction.
168    fn pre_commit(
169        self,
170        op: HookOperation<'_>,
171    ) -> impl Future<Output = Result<PreCommitRet<'_, Self>, sqlx::Error>> + Send {
172        async { PreCommitRet::ok(self, op) }
173    }
174
175    /// Called after successful commit. Cannot fail, not async.
176    fn post_commit(self) {
177        // Default: do nothing
178    }
179
180    /// Try to merge another hook of the same type into this one.
181    ///
182    /// Returns `true` if merged (other will be dropped), `false` if not (both execute separately).
183    fn merge(&mut self, _other: &mut Self) -> bool {
184        false
185    }
186
187    /// Execute the hook immediately, bypassing the hook system.
188    ///
189    /// Useful when [`AtomicOperation::add_commit_hook()`] returns `Err(hook)`.
190    fn force_execute_pre_commit(
191        self,
192        op: &mut impl AtomicOperation,
193    ) -> impl Future<Output = Result<Self, sqlx::Error>> + Send {
194        async {
195            let hook_op = HookOperation::new(op);
196            Ok(self.pre_commit(hook_op).await?.hook)
197        }
198    }
199}
200
201/// Wrapper around a database connection passed to [`CommitHook::pre_commit()`].
202///
203/// Implements [`AtomicOperation`] to allow executing database queries within the transaction.
204pub struct HookOperation<'c> {
205    now: Option<chrono::DateTime<chrono::Utc>>,
206    conn: &'c mut db::Connection,
207}
208
209impl<'c> HookOperation<'c> {
210    fn new(op: &'c mut impl AtomicOperation) -> Self {
211        Self {
212            now: op.maybe_now(),
213            conn: op.as_executor(),
214        }
215    }
216}
217
218impl<'c> AtomicOperation for HookOperation<'c> {
219    fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
220        self.now
221    }
222
223    fn as_executor(&mut self) -> &mut db::Connection {
224        self.conn
225    }
226}
227
228/// Return type for [`CommitHook::pre_commit()`].
229///
230/// Use [`PreCommitRet::ok()`] to construct: `PreCommitRet::ok(self, op)`.
231pub struct PreCommitRet<'c, H> {
232    op: HookOperation<'c>,
233    hook: H,
234}
235
236impl<'c, H> PreCommitRet<'c, H> {
237    /// Creates a successful pre-commit result.
238    pub fn ok(hook: H, op: HookOperation<'c>) -> Result<Self, sqlx::Error> {
239        Ok(Self { op, hook })
240    }
241}
242
243// --- Object-safe internal trait ---
244trait DynHook: Send {
245    #[allow(clippy::type_complexity)]
246    fn pre_commit_boxed<'c>(
247        self: Box<Self>,
248        op: HookOperation<'c>,
249    ) -> BoxFuture<'c, Result<(HookOperation<'c>, Box<dyn DynHook>), sqlx::Error>>;
250
251    fn post_commit_boxed(self: Box<Self>);
252
253    fn try_merge(&mut self, other: &mut dyn DynHook) -> bool;
254
255    fn as_any_mut(&mut self) -> &mut dyn Any;
256}
257
258impl<H: CommitHook> DynHook for H {
259    fn pre_commit_boxed<'c>(
260        self: Box<Self>,
261        op: HookOperation<'c>,
262    ) -> BoxFuture<'c, Result<(HookOperation<'c>, Box<dyn DynHook>), sqlx::Error>> {
263        Box::pin(async move {
264            let ret = self.pre_commit(op).await?;
265            Ok((ret.op, Box::new(ret.hook) as Box<dyn DynHook>))
266        })
267    }
268
269    fn post_commit_boxed(self: Box<Self>) {
270        (*self).post_commit()
271    }
272
273    fn try_merge(&mut self, other: &mut dyn DynHook) -> bool {
274        let other_h = other
275            .as_any_mut()
276            .downcast_mut::<H>()
277            .expect("hook type mismatch");
278        self.merge(other_h)
279    }
280
281    fn as_any_mut(&mut self) -> &mut dyn Any {
282        self
283    }
284}
285
286pub(crate) struct CommitHooks {
287    hooks: HashMap<TypeId, Vec<Box<dyn DynHook>>>,
288}
289
290impl CommitHooks {
291    pub fn new() -> Self {
292        Self {
293            hooks: HashMap::new(),
294        }
295    }
296
297    pub(super) fn add<H: CommitHook>(&mut self, hook: H) {
298        let type_id = TypeId::of::<H>();
299        let hooks_vec = self.hooks.entry(type_id).or_default();
300
301        let mut new_hook: Box<dyn DynHook> = Box::new(hook);
302
303        if let Some(last) = hooks_vec.last_mut()
304            && last.try_merge(new_hook.as_mut())
305        {
306            return;
307        }
308
309        hooks_vec.push(new_hook);
310    }
311
312    pub(super) async fn execute_pre(
313        mut self,
314        op: &mut impl AtomicOperation,
315    ) -> Result<PostCommitHooks, sqlx::Error> {
316        let mut op = HookOperation::new(op);
317        let mut post_hooks = Vec::new();
318
319        for (_, hooks_vec) in self.hooks.drain() {
320            for hook in hooks_vec {
321                let (new_op, hook) = hook.pre_commit_boxed(op).await?;
322                op = new_op;
323                post_hooks.push(hook);
324            }
325        }
326
327        Ok(PostCommitHooks { hooks: post_hooks })
328    }
329}
330
331impl Default for CommitHooks {
332    fn default() -> Self {
333        Self::new()
334    }
335}
336
337pub struct PostCommitHooks {
338    hooks: Vec<Box<dyn DynHook>>,
339}
340
341impl PostCommitHooks {
342    pub(super) fn execute(self) {
343        for hook in self.hooks {
344            hook.post_commit_boxed();
345        }
346    }
347}