1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
// Copyright 2020 ZomboDB, LLC <zombodb@gmail.com>. All rights reserved. Use of this source code is
// governed by the MIT license that can be found in the LICENSE file.

//! Provides safe wrappers around Postgres' "Transaction" and "Sub Transaction" hook system

use crate::pg_sys;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;

/// Postgres Transaction (Xact) Callback Events
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub enum PgXactCallbackEvent {
    /// Fired when a transaction is aborted.  It is mutually exclusive with `PgXactCallbackEvent::Commit`
    ///
    /// ## Safety
    ///
    /// Any kind of Rust `panic!()` or Postgres `ereport(ERROR)` while this event is firing will
    /// cause the Postgres backend to abort.
    Abort,

    /// Fired when a transcation is committed.  It is mutually exclusive with `PgXactCallbackEvent::Abort`
    ///
    /// ## Safety
    ///
    /// Any kind of Rust `panic!()` or Postgres `ereport(ERROR)` while this event is firing will
    /// cause the Postgres backend to abort.
    Commit,

    /// Fired immediately before the transaction is committed.  This is your last chance to cleanly
    /// abort the current transaction via a Rust `panic!()` or Postgres `ereport(ERROR)`
    PreCommit,

    /// Same as `::Abort`, but for parallel workers
    ParallelAbort,

    /// Same as `::Commit`, but for parallel workers
    ParallelCommit,

    /// Same as `::PreCommit`, but for parallel workers
    ParallelPreCommit,

    /// Same as `::Commit`, but for committing a prepared transaction
    Prepare,

    /// Same as `::PreCommit`, but for committing a prepared transaction
    PrePrepare,
}

impl PgXactCallbackEvent {
    fn translate_pg_event(pg_event: pg_sys::XactEvent) -> Self {
        match pg_event {
            pg_sys::XactEvent_XACT_EVENT_ABORT => PgXactCallbackEvent::Abort,
            pg_sys::XactEvent_XACT_EVENT_COMMIT => PgXactCallbackEvent::Commit,
            pg_sys::XactEvent_XACT_EVENT_PARALLEL_ABORT => PgXactCallbackEvent::ParallelAbort,
            pg_sys::XactEvent_XACT_EVENT_PARALLEL_COMMIT => PgXactCallbackEvent::ParallelCommit,
            pg_sys::XactEvent_XACT_EVENT_PARALLEL_PRE_COMMIT => {
                PgXactCallbackEvent::ParallelPreCommit
            }
            pg_sys::XactEvent_XACT_EVENT_PREPARE => PgXactCallbackEvent::Prepare,
            pg_sys::XactEvent_XACT_EVENT_PRE_COMMIT => PgXactCallbackEvent::PreCommit,
            pg_sys::XactEvent_XACT_EVENT_PRE_PREPARE => PgXactCallbackEvent::PrePrepare,
            unknown => panic!("Unrecognized XactEvent: {}", unknown),
        }
    }
}

/// Registering a transaction event callback returns a `XactCallbackReceipt` that can be used
/// to unregister the callback if it later (within the confines of the current transaction)
/// becomes unnecessary
pub struct XactCallbackReceipt(Rc<RefCell<Option<XactCallbackWrapper>>>);

impl XactCallbackReceipt {
    /// Consumes this `XactCallbackReceipt` and unregisters the registered callback it represents
    ///
    /// ## Examples
    ///
    /// ```rust,no_run
    /// use pgx::*;
    ///
    /// let receipt = register_xact_callback(PgXactCallbackEvent::Commit, || info!("called after commit"));
    ///
    /// let no_longer_necessary = true;
    ///
    /// if no_longer_necessary {
    ///     receipt.unregister_callback();
    /// }
    /// ```
    pub fn unregister_callback(self) {
        self.0.replace(None);
    }
}

/// An internal wrapper for a callback closure
struct XactCallbackWrapper(
    Box<dyn FnOnce() + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static>,
);

/// Shorthand for the type representing the map of callbacks
type CallbackMap = HashMap<PgXactCallbackEvent, Vec<Rc<RefCell<Option<XactCallbackWrapper>>>>>;

/// Register a closure to be called during one of the `PgXactCallbackEvent` events.  Multiple
/// closures can be registered per event (one at a time), and they are called in the order in which
/// they were registered.
///
/// Registered callbacks only remain registered for the life of a single transaction.  Registration
/// of permanet callbacks should be done through the unsafe `pg_sys::RegisterXactCallback()` function.
///
///
/// ## Examples
///
/// Register a number of events for pre-commit and commit:
///
/// ```rust,no_run
/// use pgx::*;
///
/// register_xact_callback(PgXactCallbackEvent::PreCommit, || info!("pre-commit #1"));
/// register_xact_callback(PgXactCallbackEvent::PreCommit, || info!("pre-commit #2"));
/// register_xact_callback(PgXactCallbackEvent::PreCommit, || info!("pre-commit #3"));
/// register_xact_callback(PgXactCallbackEvent::Commit, || info!("called after commit"));
/// ```
///
/// Register an event, do some work, and then decide the callback isn't actually necessary anymore:
///
/// ```rust,no_run
/// use pgx::*;
///
/// // ... do some initialization work ...
///
/// let receipt = register_xact_callback(PgXactCallbackEvent::Abort, || { /* do cleanup if xact aborts */});
///
/// // ... do work that might abort the transaction ...
///
/// // if we got here, the transaction did not abort, so we no longer need to care about cleanup
/// receipt.unregister_callback();
/// ```
///
/// ## Safety
///
/// Any kind of Rust `panic!()` or Postgres `ereport(ERROR)` while executing a `PgXactCallbackEvent::Commit`
/// or `PgXactCallbackEvent::Abort` event will immediately cause the Postgres backend to abort and
/// the entire cluster to restart.
///
/// As the Postgres internal documentation says:  
///
/// At transaction end, the callback occurs post-commit or post-abort, so the callback
/// functions can only do noncritical cleanup.
pub fn register_xact_callback<F>(which_event: PgXactCallbackEvent, f: F) -> XactCallbackReceipt
where
    F: FnOnce() + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static,
{
    // our map of xact callbacks.  It starts as None and gets initialized below in maybe_initialize()
    static mut XACT_HOOKS: Option<CallbackMap> = None;

    // internal function that we register as an XactCallback
    unsafe extern "C" fn callback(event: pg_sys::XactEvent, _arg: *mut ::std::os::raw::c_void) {
        let which_event = PgXactCallbackEvent::translate_pg_event(event);

        let hooks = match which_event {
            // pgx's XactCallbacks are per-transaction, so when the transaction is over
            // (that's either Commit or Abort, which are mutually exclusive), we replace our
            // const XACT_HOOKS with a new, empty Map so that subsequent transactions won't accidentally run
            // these hooks again.
            //
            // Note that we still run any hooks that are registered for these events in this xact
            PgXactCallbackEvent::Commit
            | PgXactCallbackEvent::Abort
            | PgXactCallbackEvent::ParallelCommit
            | PgXactCallbackEvent::ParallelAbort => XACT_HOOKS
                .replace(HashMap::new())
                .expect("XACT_HOOKS was None during Commit/Abort")
                .remove(&which_event),

            // not in a transaction-end event, so just borrow our map
            _ => XACT_HOOKS
                .as_mut()
                .expect("XACT_HOOKS was None")
                .remove(&which_event),
        };

        // if we have a vec of hooks for this event they're consumed here and executed
        // in the order they were registered
        if let Some(hooks) = hooks {
            for hook in hooks.into_iter() {
                // TODO:  do we need to catch panics and do something with them?  They'll cause
                //  the Postgres backend to Abort() if we're handling XactEvent::Commit/Abort events

                // effectively 'take' the hook from the internal RefCell
                if let Some(hook) = hook.replace(None) {
                    // and execute it under guard for proper panic/elog(ERROR) handling
                    crate::guard::guard(hook.0);
                }
            }
        }
    }

    // internal function to manage initialization of our transaction callback
    fn maybe_initialize<'a>() -> &'a mut CallbackMap {
        unsafe {
            // if this is our first time here since the Postgres backend started, XACT_HOOKS will be None
            if XACT_HOOKS.is_none() {
                // so lets swap it out with a new HashMap, which will live for the duration of the backend
                XACT_HOOKS.replace(HashMap::new());

                // and register our single callback function (internally defined above)
                pg_sys::RegisterXactCallback(Some(callback), std::ptr::null_mut());
            }

            XACT_HOOKS
                .as_mut()
                .expect("XACT_HOOKS was None during maybe_initialize") // this should never happen
        }
    }

    // get a mutable reference to XACT_HOOKS
    let hooks = maybe_initialize();

    // wrap the user-provided closure as an optional, reference counted cell
    let wrapped_func = Rc::new(RefCell::new(Some(XactCallbackWrapper(Box::new(f)))));

    // find (or create) the map Entry for the specified event and add our wrapped hook to it
    let entry = hooks.entry(which_event).or_default();
    entry.push(Rc::clone(&wrapped_func));

    // give the user the ability to unregister
    XactCallbackReceipt(wrapped_func)
}

#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub enum PgSubXactCallbackEvent {
    /// Fired when a subtransaction is aborted.  While Rust `panic!()`s and Postgres `ereport(ERROR)`s
    /// can occur here, it's not recommended
    AbortSub,

    /// Fired when a subtransaction is committed.  While Rust `panic!()`s and Postgres `ereport(ERROR)`s
    /// can occur here, it's not recommended
    CommitSub,

    /// Fired immediately before a subtransaction is committed.  This is your last chance to instead
    /// abort the subtransaction before it really commits
    PreCommitSub,

    /// Fired when a subtransaction is first created
    StartSub,
}

impl PgSubXactCallbackEvent {
    fn translate_pg_event(event: pg_sys::SubXactEvent) -> Self {
        match event {
            pg_sys::SubXactEvent_SUBXACT_EVENT_ABORT_SUB => PgSubXactCallbackEvent::AbortSub,
            pg_sys::SubXactEvent_SUBXACT_EVENT_COMMIT_SUB => PgSubXactCallbackEvent::CommitSub,
            pg_sys::SubXactEvent_SUBXACT_EVENT_PRE_COMMIT_SUB => {
                PgSubXactCallbackEvent::PreCommitSub
            }
            pg_sys::SubXactEvent_SUBXACT_EVENT_START_SUB => PgSubXactCallbackEvent::StartSub,
            _ => panic!("Unrecognized SubXactEvent: {}", event),
        }
    }
}

/// Registering a sub-transaction event callback returns a `XactCallbackReceipt` that can be used
/// to unregister the callback if it later (within the confines of the current transaction)
/// becomes unnecessary
pub struct SubXactCallbackReceipt(Rc<RefCell<Option<SubXactCallbackWrapper>>>);

impl SubXactCallbackReceipt {
    /// Consumes this `SubXactCallbackReceipt` and unregisters the registered callback it represents
    ///
    /// ## Examples
    ///
    /// ```rust,no_run
    /// use pgx::*;
    ///
    /// let receipt = register_subxact_callback(PgSubXactCallbackEvent::CommitSub, |my_subid, parent_subid| info!("called after commit-sub: {} {}", my_subid, parent_subid));
    ///
    /// let no_longer_necessary = true;
    ///
    /// if no_longer_necessary {
    ///     receipt.unregister_callback();
    /// }
    /// ```
    pub fn unregister_callback(self) {
        self.0.replace(None);
    }
}

struct SubXactCallbackWrapper(
    Box<
        dyn Fn(pg_sys::SubTransactionId, pg_sys::SubTransactionId)
            + std::panic::UnwindSafe
            + std::panic::RefUnwindSafe
            + 'static,
    >,
);

type SubCallbackMap =
    HashMap<PgSubXactCallbackEvent, Vec<Rc<RefCell<Option<SubXactCallbackWrapper>>>>>;

pub fn register_subxact_callback<F>(
    which_event: PgSubXactCallbackEvent,
    f: F,
) -> SubXactCallbackReceipt
where
    F: Fn(pg_sys::SubTransactionId, pg_sys::SubTransactionId)
        + std::panic::UnwindSafe
        + std::panic::RefUnwindSafe
        + 'static,
{
    static mut SUB_HOOKS: Option<SubCallbackMap> = None;

    unsafe extern "C" fn callback(
        event: pg_sys::SubXactEvent,
        my_subid: pg_sys::SubTransactionId,
        parent_subid: pg_sys::SubTransactionId,
        _arg: *mut ::std::os::raw::c_void,
    ) {
        let which_event = PgSubXactCallbackEvent::translate_pg_event(event);

        let hooks = SUB_HOOKS.as_mut();

        // if we have a vec of hooks for this event they're consumed here and executed
        // in the order they were registered
        if let Some(hooks) = hooks {
            let hooks = hooks.get(&which_event);
            if let Some(hooks) = hooks {
                for hook in hooks.iter() {
                    let hook = hook.borrow();
                    if let Some(hook) = hook.as_ref() {
                        crate::guard::guard(|| (hook.0)(my_subid, parent_subid));
                    }
                }
            }
        }
    }

    fn maybe_initialize<'a>() -> &'a mut SubCallbackMap {
        unsafe {
            if SUB_HOOKS.is_none() {
                SUB_HOOKS.replace(HashMap::new());

                // unregister previous callback registration.  It's okay if this is the first time
                pg_sys::UnregisterSubXactCallback(Some(callback), std::ptr::null_mut());

                // register our new callback
                pg_sys::RegisterSubXactCallback(Some(callback), std::ptr::null_mut());

                // register transaction callbacks so we can clear our hooks when the transaction ends
                // this is necessary b/c it's possible for the user to register sub transaction callbacks
                // within a transaction but a subtransaction never actually occurs
                register_xact_callback(PgXactCallbackEvent::Commit, || {
                    // reset SUB_HOOKS to None on outer transaction COMMIT
                    SUB_HOOKS.take();
                });
                register_xact_callback(PgXactCallbackEvent::Abort, || {
                    // reset SUB_HOOKS to None on outer transaction ABORT
                    SUB_HOOKS.take();
                });
            }

            SUB_HOOKS
                .as_mut()
                .expect("SUB_HOOKS was None during maybe_initialize()") // this should never happen
        }
    }

    let hooks = maybe_initialize();
    let entry = hooks.entry(which_event).or_default();
    let wrapped_func = Rc::new(RefCell::new(Some(SubXactCallbackWrapper(Box::new(f)))));
    entry.push(wrapped_func.clone());

    SubXactCallbackReceipt(wrapped_func)
}