pgx 0.7.0

pgx: A Rust framework for creating Postgres extensions
Documentation
/*
Portions Copyright 2019-2021 ZomboDB, LLC.
Portions Copyright 2021-2022 Technology Concepts & Design, Inc. <support@tcdi.com>

All rights reserved.

Use of this source code is governed by the MIT license that can be found in the LICENSE file.
*/

//! Provides safe wrappers around Postgres' "Transaction" and "Sub Transaction" hook system

use crate as pgx; // for #[pg_guard] support from within ourself
use crate::pg_sys;
use crate::prelude::*;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;

/// Postgres Transaction (Xact) Callback Events
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub enum PgXactCallbackEvent {
    /// Fired when a transaction is aborted.  It is mutually exclusive with `PgXactCallbackEvent::Commit`
    ///
    /// ## Safety
    ///
    /// Any kind of Rust `panic!()` or Postgres `ereport(ERROR)` while this event is firing will
    /// cause the Postgres backend to abort.
    Abort,

    /// Fired when a transcation is committed.  It is mutually exclusive with `PgXactCallbackEvent::Abort`
    ///
    /// ## Safety
    ///
    /// Any kind of Rust `panic!()` or Postgres `ereport(ERROR)` while this event is firing will
    /// cause the Postgres backend to abort.
    Commit,

    /// Fired immediately before the transaction is committed.  This is your last chance to cleanly
    /// abort the current transaction via a Rust `panic!()` or Postgres `ereport(ERROR)`
    PreCommit,

    /// Same as `::Abort`, but for parallel workers
    ParallelAbort,

    /// Same as `::Commit`, but for parallel workers
    ParallelCommit,

    /// Same as `::PreCommit`, but for parallel workers
    ParallelPreCommit,

    /// Same as `::Commit`, but for committing a prepared transaction
    Prepare,

    /// Same as `::PreCommit`, but for committing a prepared transaction
    PrePrepare,
}

impl PgXactCallbackEvent {
    fn translate_pg_event(pg_event: pg_sys::XactEvent) -> Self {
        match pg_event {
            pg_sys::XactEvent_XACT_EVENT_ABORT => PgXactCallbackEvent::Abort,
            pg_sys::XactEvent_XACT_EVENT_COMMIT => PgXactCallbackEvent::Commit,
            pg_sys::XactEvent_XACT_EVENT_PARALLEL_ABORT => PgXactCallbackEvent::ParallelAbort,
            pg_sys::XactEvent_XACT_EVENT_PARALLEL_COMMIT => PgXactCallbackEvent::ParallelCommit,
            pg_sys::XactEvent_XACT_EVENT_PARALLEL_PRE_COMMIT => {
                PgXactCallbackEvent::ParallelPreCommit
            }
            pg_sys::XactEvent_XACT_EVENT_PREPARE => PgXactCallbackEvent::Prepare,
            pg_sys::XactEvent_XACT_EVENT_PRE_COMMIT => PgXactCallbackEvent::PreCommit,
            pg_sys::XactEvent_XACT_EVENT_PRE_PREPARE => PgXactCallbackEvent::PrePrepare,
            unknown => panic!("Unrecognized XactEvent: {}", unknown),
        }
    }
}

/// Registering a transaction event callback returns a `XactCallbackReceipt` that can be used
/// to unregister the callback if it later (within the confines of the current transaction)
/// becomes unnecessary
pub struct XactCallbackReceipt(Rc<RefCell<Option<XactCallbackWrapper>>>);

impl XactCallbackReceipt {
    /// Consumes this `XactCallbackReceipt` and unregisters the registered callback it represents
    ///
    /// ## Examples
    ///
    /// ```rust,no_run
    /// use pgx::*;
    ///
    /// let receipt = register_xact_callback(PgXactCallbackEvent::Commit, || info!("called after commit"));
    ///
    /// let no_longer_necessary = true;
    ///
    /// if no_longer_necessary {
    ///     receipt.unregister_callback();
    /// }
    /// ```
    pub fn unregister_callback(self) {
        self.0.replace(None);
    }
}

/// An internal wrapper for a callback closure
struct XactCallbackWrapper(
    Box<dyn FnOnce() + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static>,
);

/// Shorthand for the type representing the map of callbacks
type CallbackMap = HashMap<PgXactCallbackEvent, Vec<Rc<RefCell<Option<XactCallbackWrapper>>>>>;

/// Register a closure to be called during one of the `PgXactCallbackEvent` events.  Multiple
/// closures can be registered per event (one at a time), and they are called in the order in which
/// they were registered.
///
/// Registered callbacks only remain registered for the life of a single transaction.  Registration
/// of permanet callbacks should be done through the unsafe `pg_sys::RegisterXactCallback()` function.
///
///
/// ## Examples
///
/// Register a number of events for pre-commit and commit:
///
/// ```rust,no_run
/// use pgx::*;
///
/// register_xact_callback(PgXactCallbackEvent::PreCommit, || info!("pre-commit #1"));
/// register_xact_callback(PgXactCallbackEvent::PreCommit, || info!("pre-commit #2"));
/// register_xact_callback(PgXactCallbackEvent::PreCommit, || info!("pre-commit #3"));
/// register_xact_callback(PgXactCallbackEvent::Commit, || info!("called after commit"));
/// ```
///
/// Register an event, do some work, and then decide the callback isn't actually necessary anymore:
///
/// ```rust,no_run
/// use pgx::*;
///
/// // ... do some initialization work ...
///
/// let receipt = register_xact_callback(PgXactCallbackEvent::Abort, || { /* do cleanup if xact aborts */});
///
/// // ... do work that might abort the transaction ...
///
/// // if we got here, the transaction did not abort, so we no longer need to care about cleanup
/// receipt.unregister_callback();
/// ```
///
/// ## Safety
///
/// Any kind of Rust `panic!()` or Postgres `ereport(ERROR)` while executing a `PgXactCallbackEvent::Commit`
/// or `PgXactCallbackEvent::Abort` event will immediately cause the Postgres backend to abort and
/// the entire cluster to restart.
///
/// As the Postgres internal documentation says:  
///
/// At transaction end, the callback occurs post-commit or post-abort, so the callback
/// functions can only do noncritical cleanup.
pub fn register_xact_callback<F>(which_event: PgXactCallbackEvent, f: F) -> XactCallbackReceipt
where
    F: FnOnce() + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static,
{
    // our map of xact callbacks.  It starts as None and gets initialized below in maybe_initialize()
    static mut XACT_HOOKS: Option<CallbackMap> = None;

    // internal function that we register as an XactCallback
    #[pg_guard]
    unsafe extern "C" fn callback(event: pg_sys::XactEvent, _arg: *mut ::std::os::raw::c_void) {
        let which_event = PgXactCallbackEvent::translate_pg_event(event);

        let hooks = match which_event {
            // pgx's XactCallbacks are per-transaction, so when the transaction is over
            // (that's either Commit or Abort, which are mutually exclusive), we replace our
            // const XACT_HOOKS with a new, empty Map so that subsequent transactions won't accidentally run
            // these hooks again.
            //
            // Note that we still run any hooks that are registered for these events in this xact
            PgXactCallbackEvent::Commit
            | PgXactCallbackEvent::Abort
            | PgXactCallbackEvent::ParallelCommit
            | PgXactCallbackEvent::ParallelAbort => XACT_HOOKS
                .replace(HashMap::new())
                .expect("XACT_HOOKS was None during Commit/Abort")
                .remove(&which_event),

            // not in a transaction-end event, so just borrow our map
            _ => XACT_HOOKS.as_mut().expect("XACT_HOOKS was None").remove(&which_event),
        };

        // if we have a vec of hooks for this event they're consumed here and executed
        // in the order they were registered
        if let Some(hooks) = hooks {
            for hook in hooks.into_iter() {
                // TODO:  do we need to catch panics and do something with them?  They'll cause
                //  the Postgres backend to Abort() if we're handling XactEvent::Commit/Abort events

                // effectively 'take' the hook from the internal RefCell
                if let Some(hook) = hook.replace(None) {
                    // and execute it under guard for proper panic/elog(ERROR) handling
                    hook.0();
                }
            }
        }
    }

    // internal function to manage initialization of our transaction callback
    fn maybe_initialize<'a>() -> &'a mut CallbackMap {
        unsafe {
            // if this is our first time here since the Postgres backend started, XACT_HOOKS will be None
            if XACT_HOOKS.is_none() {
                // so lets swap it out with a new HashMap, which will live for the duration of the backend
                XACT_HOOKS.replace(HashMap::new());

                // and register our single callback function (internally defined above)
                pg_sys::RegisterXactCallback(Some(callback), std::ptr::null_mut());
            }

            XACT_HOOKS.as_mut().expect("XACT_HOOKS was None during maybe_initialize")
            // this should never happen
        }
    }

    // get a mutable reference to XACT_HOOKS
    let hooks = maybe_initialize();

    // wrap the user-provided closure as an optional, reference counted cell
    let wrapped_func = Rc::new(RefCell::new(Some(XactCallbackWrapper(Box::new(f)))));

    // find (or create) the map Entry for the specified event and add our wrapped hook to it
    let entry = hooks.entry(which_event).or_default();
    entry.push(Rc::clone(&wrapped_func));

    // give the user the ability to unregister
    XactCallbackReceipt(wrapped_func)
}

#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub enum PgSubXactCallbackEvent {
    /// Fired when a subtransaction is aborted.  While Rust `panic!()`s and Postgres `ereport(ERROR)`s
    /// can occur here, it's not recommended
    AbortSub,

    /// Fired when a subtransaction is committed.  While Rust `panic!()`s and Postgres `ereport(ERROR)`s
    /// can occur here, it's not recommended
    CommitSub,

    /// Fired immediately before a subtransaction is committed.  This is your last chance to instead
    /// abort the subtransaction before it really commits
    PreCommitSub,

    /// Fired when a subtransaction is first created
    StartSub,
}

impl PgSubXactCallbackEvent {
    fn translate_pg_event(event: pg_sys::SubXactEvent) -> Self {
        match event {
            pg_sys::SubXactEvent_SUBXACT_EVENT_ABORT_SUB => PgSubXactCallbackEvent::AbortSub,
            pg_sys::SubXactEvent_SUBXACT_EVENT_COMMIT_SUB => PgSubXactCallbackEvent::CommitSub,
            pg_sys::SubXactEvent_SUBXACT_EVENT_PRE_COMMIT_SUB => {
                PgSubXactCallbackEvent::PreCommitSub
            }
            pg_sys::SubXactEvent_SUBXACT_EVENT_START_SUB => PgSubXactCallbackEvent::StartSub,
            _ => panic!("Unrecognized SubXactEvent: {}", event),
        }
    }
}

/// Registering a sub-transaction event callback returns a `XactCallbackReceipt` that can be used
/// to unregister the callback if it later (within the confines of the current transaction)
/// becomes unnecessary
pub struct SubXactCallbackReceipt(Rc<RefCell<Option<SubXactCallbackWrapper>>>);

impl SubXactCallbackReceipt {
    /// Consumes this `SubXactCallbackReceipt` and unregisters the registered callback it represents
    ///
    /// ## Examples
    ///
    /// ```rust,no_run
    /// use pgx::*;
    ///
    /// let receipt = register_subxact_callback(PgSubXactCallbackEvent::CommitSub, |my_subid, parent_subid| info!("called after commit-sub: {} {}", my_subid, parent_subid));
    ///
    /// let no_longer_necessary = true;
    ///
    /// if no_longer_necessary {
    ///     receipt.unregister_callback();
    /// }
    /// ```
    pub fn unregister_callback(self) {
        self.0.replace(None);
    }
}

struct SubXactCallbackWrapper(
    Box<
        dyn Fn(pg_sys::SubTransactionId, pg_sys::SubTransactionId)
            + std::panic::UnwindSafe
            + std::panic::RefUnwindSafe
            + 'static,
    >,
);

type SubCallbackMap =
    HashMap<PgSubXactCallbackEvent, Vec<Rc<RefCell<Option<SubXactCallbackWrapper>>>>>;

pub fn register_subxact_callback<F>(
    which_event: PgSubXactCallbackEvent,
    f: F,
) -> SubXactCallbackReceipt
where
    F: Fn(pg_sys::SubTransactionId, pg_sys::SubTransactionId)
        + std::panic::UnwindSafe
        + std::panic::RefUnwindSafe
        + 'static,
{
    static mut SUB_HOOKS: Option<SubCallbackMap> = None;

    #[pg_guard]
    unsafe extern "C" fn callback(
        event: pg_sys::SubXactEvent,
        my_subid: pg_sys::SubTransactionId,
        parent_subid: pg_sys::SubTransactionId,
        _arg: *mut ::std::os::raw::c_void,
    ) {
        let which_event = PgSubXactCallbackEvent::translate_pg_event(event);

        let hooks = SUB_HOOKS.as_mut();

        // if we have a vec of hooks for this event they're consumed here and executed
        // in the order they were registered
        if let Some(hooks) = hooks {
            let hooks = hooks.get(&which_event);
            if let Some(hooks) = hooks {
                for hook in hooks.iter() {
                    let hook = hook.borrow();
                    if let Some(hook) = hook.as_ref() {
                        (hook.0)(my_subid, parent_subid)
                    }
                }
            }
        }
    }

    fn maybe_initialize<'a>() -> &'a mut SubCallbackMap {
        unsafe {
            if SUB_HOOKS.is_none() {
                SUB_HOOKS.replace(HashMap::new());

                // unregister previous callback registration.  It's okay if this is the first time
                pg_sys::UnregisterSubXactCallback(Some(callback), std::ptr::null_mut());

                // register our new callback
                pg_sys::RegisterSubXactCallback(Some(callback), std::ptr::null_mut());

                // register transaction callbacks so we can clear our hooks when the transaction ends
                // this is necessary b/c it's possible for the user to register sub transaction callbacks
                // within a transaction but a subtransaction never actually occurs
                register_xact_callback(PgXactCallbackEvent::Commit, || {
                    // reset SUB_HOOKS to None on outer transaction COMMIT
                    SUB_HOOKS.take();
                });
                register_xact_callback(PgXactCallbackEvent::Abort, || {
                    // reset SUB_HOOKS to None on outer transaction ABORT
                    SUB_HOOKS.take();
                });
            }

            SUB_HOOKS.as_mut().expect("SUB_HOOKS was None during maybe_initialize()")
            // this should never happen
        }
    }

    let hooks = maybe_initialize();
    let entry = hooks.entry(which_event).or_default();
    let wrapped_func = Rc::new(RefCell::new(Some(SubXactCallbackWrapper(Box::new(f)))));
    entry.push(wrapped_func.clone());

    SubXactCallbackReceipt(wrapped_func)
}