Module hooks

Module hooks 

Source
Expand description

Commit hooks for executing custom logic before and after transaction commits.

This module provides the CommitHook trait and supporting types that allow you to register hooks that execute during the commit lifecycle of a transaction. This is useful for:

  • Publishing events to message queues after successful commits
  • Updating caches
  • Triggering side effects that should only occur if the transaction succeeds
  • Accumulating operations across multiple entity updates in a transaction

§Hook Lifecycle

  1. Registration: Hooks are registered using AtomicOperation::add_commit_hook()
  2. Merging: Multiple hooks of the same type may be merged via CommitHook::merge()
  3. Pre-commit: CommitHook::pre_commit() executes before the transaction commits
  4. Commit: The underlying database transaction is committed
  5. Post-commit: CommitHook::post_commit() executes after successful commit

§Examples

§Hook with Database Operations and Channel-Based Publishing

This example shows a complete event publishing hook that:

  • Stores events in the database during pre-commit (within the transaction)
  • Sends events to a channel during post-commit for async processing
  • Merges multiple hook instances to batch operations

Note: post_commit() is synchronous and cannot fail, so it’s best used for fire-and-forget operations like sending to channels. A background task can then handle the async work of publishing to external systems.

use es_entity::{AtomicOperation, operation::hooks::{CommitHook, HookOperation, PreCommitRet}};

#[derive(Debug, Clone)]
struct Event {
    entity_id: uuid::Uuid,
    event_type: String,
}

#[derive(Debug)]
struct EventPublisher {
    events: Vec<Event>,
    // Channel sender for publishing events to a background processor
    // In production, this might be tokio::sync::mpsc::Sender or similar
    tx: std::sync::mpsc::Sender<Event>,
}

impl CommitHook for EventPublisher {
    async fn pre_commit(self, mut op: HookOperation<'_>)
        -> Result<PreCommitRet<'_, Self>, sqlx::Error>
    {
        // Store events in the database within the transaction
        // If the transaction fails, these inserts will be rolled back
        for event in &self.events {
            sqlx::query!(
                "INSERT INTO hook_events (entity_id, event_type, created_at) VALUES ($1, $2, NOW())",
                event.entity_id,
                event.event_type
            )
            .execute(op.as_executor())
            .await?;
        }

        PreCommitRet::ok(self, op)
    }

    fn post_commit(self) {
        // Send events to a channel for async processing
        // This only runs if the transaction succeeded
        // Channel sends are fast and don't block; a background task handles publishing
        for event in self.events {
            // In production, handle send failures appropriately (logging, metrics, etc.)
            // The channel might be bounded to apply backpressure
            let _ = self.tx.send(event);
        }
    }

    fn merge(&mut self, other: &mut Self) -> bool {
        // Merge multiple EventPublisher hooks into one to batch operations
        self.events.append(&mut other.events);
        true
    }
}

// Separate background task for async event publishing
// async fn event_publisher_task(mut rx: tokio::sync::mpsc::Receiver<Event>) {
//     while let Some(event) = rx.recv().await {
//         // Publish to Kafka, RabbitMQ, webhooks, etc.
//         // Handle failures with retries, dead-letter queues, etc.
//         match publish_to_external_system(&event).await {
//             Ok(_) => log::info!("Published event: {:?}", event),
//             Err(e) => log::error!("Failed to publish event: {:?}", e),
//         }
//     }
// }

§Usage

let user_id = uuid::Uuid::nil();
let (tx, _rx) = std::sync::mpsc::channel();
let mut op = DbOp::init(&pool).await?;

// Add first hook
op.add_commit_hook(EventPublisher {
    events: vec![Event { entity_id: user_id, event_type: "user.created".to_string() }],
    tx: tx.clone(),
}).expect("could not add hook");

// Add second hook - will merge with the first
op.add_commit_hook(EventPublisher {
    events: vec![Event { entity_id: user_id, event_type: "email.sent".to_string() }],
    tx: tx.clone(),
}).expect("could not add hook");

// Both hooks merge into one, events are stored in DB, then sent to channel
op.commit().await?;

Structs§

HookOperation
Wrapper around a database connection passed to CommitHook::pre_commit().
PostCommitHooks
PreCommitRet
Return type for CommitHook::pre_commit().

Traits§

CommitHook
Trait for implementing custom commit hooks that execute before and after transaction commits.

Type Aliases§

BoxFuture
Type alias for boxed async futures.