1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
// Copyright 2020 ZomboDB, LLC <zombodb@gmail.com>. All rights reserved. Use of this source code is // governed by the MIT license that can be found in the LICENSE file. //! Provides safe wrappers around Postgres' "Transaction" and "Sub Transaction" hook system use crate::pg_sys; use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; /// Postgres Transaction (Xact) Callback Events #[derive(Hash, Eq, PartialEq, Clone, Debug)] pub enum PgXactCallbackEvent { /// Fired when a transaction is aborted. It is mutually exclusive with `PgXactCallbackEvent::Commit` /// /// ## Safety /// /// Any kind of Rust `panic!()` or Postgres `ereport(ERROR)` while this event is firing will /// cause the Postgres backend to abort. Abort, /// Fired when a transcation is committed. It is mutually exclusive with `PgXactCallbackEvent::Abort` /// /// ## Safety /// /// Any kind of Rust `panic!()` or Postgres `ereport(ERROR)` while this event is firing will /// cause the Postgres backend to abort. Commit, /// Fired immediately before the transaction is committed. This is your last chance to cleanly /// abort the current transaction via a Rust `panic!()` or Postgres `ereport(ERROR)` PreCommit, /// Same as `::Abort`, but for parallel workers ParallelAbort, /// Same as `::Commit`, but for parallel workers ParallelCommit, /// Same as `::PreCommit`, but for parallel workers ParallelPreCommit, /// Same as `::Commit`, but for committing a prepared transaction Prepare, /// Same as `::PreCommit`, but for committing a prepared transaction PrePrepare, } impl PgXactCallbackEvent { fn translate_pg_event(pg_event: pg_sys::XactEvent) -> Self { match pg_event { pg_sys::XactEvent_XACT_EVENT_ABORT => PgXactCallbackEvent::Abort, pg_sys::XactEvent_XACT_EVENT_COMMIT => PgXactCallbackEvent::Commit, pg_sys::XactEvent_XACT_EVENT_PARALLEL_ABORT => PgXactCallbackEvent::ParallelAbort, pg_sys::XactEvent_XACT_EVENT_PARALLEL_COMMIT => PgXactCallbackEvent::ParallelCommit, pg_sys::XactEvent_XACT_EVENT_PARALLEL_PRE_COMMIT => { PgXactCallbackEvent::ParallelPreCommit } pg_sys::XactEvent_XACT_EVENT_PREPARE => PgXactCallbackEvent::Prepare, pg_sys::XactEvent_XACT_EVENT_PRE_COMMIT => PgXactCallbackEvent::PreCommit, pg_sys::XactEvent_XACT_EVENT_PRE_PREPARE => PgXactCallbackEvent::PrePrepare, unknown => panic!("Unrecognized XactEvent: {}", unknown), } } } /// Registering a transaction event callback returns a `XactCallbackReceipt` that can be used /// to unregister the callback if it later (within the confines of the current transaction) /// becomes unnecessary pub struct XactCallbackReceipt(Rc<RefCell<Option<XactCallbackWrapper>>>); impl XactCallbackReceipt { /// Consumes this `XactCallbackReceipt` and unregisters the registered callback it represents /// /// ## Examples /// /// ```rust,no_run /// use pgx::*; /// /// let receipt = register_xact_callback(PgXactCallbackEvent::Commit, || info!("called after commit")); /// /// let no_longer_necessary = true; /// /// if no_longer_necessary { /// receipt.unregister_callback(); /// } /// ``` pub fn unregister_callback(self) { self.0.replace(None); } } /// An internal wrapper for a callback closure struct XactCallbackWrapper( Box<dyn FnOnce() + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static>, ); /// Shorthand for the type representing the map of callbacks type CallbackMap = HashMap<PgXactCallbackEvent, Vec<Rc<RefCell<Option<XactCallbackWrapper>>>>>; /// Register a closure to be called during one of the `PgXactCallbackEvent` events. Multiple /// closures can be registered per event (one at a time), and they are called in the order in which /// they were registered. /// /// Registered callbacks only remain registered for the life of a single transaction. Registration /// of permanet callbacks should be done through the unsafe `pg_sys::RegisterXactCallback()` function. /// /// /// ## Examples /// /// Register a number of events for pre-commit and commit: /// /// ```rust,no_run /// use pgx::*; /// /// register_xact_callback(PgXactCallbackEvent::PreCommit, || info!("pre-commit #1")); /// register_xact_callback(PgXactCallbackEvent::PreCommit, || info!("pre-commit #2")); /// register_xact_callback(PgXactCallbackEvent::PreCommit, || info!("pre-commit #3")); /// register_xact_callback(PgXactCallbackEvent::Commit, || info!("called after commit")); /// ``` /// /// Register an event, do some work, and then decide the callback isn't actually necessary anymore: /// /// ```rust,no_run /// use pgx::*; /// /// // ... do some initialization work ... /// /// let receipt = register_xact_callback(PgXactCallbackEvent::Abort, || { /* do cleanup if xact aborts */}); /// /// // ... do work that might abort the transaction ... /// /// // if we got here, the transaction did not abort, so we no longer need to care about cleanup /// receipt.unregister_callback(); /// ``` /// /// ## Safety /// /// Any kind of Rust `panic!()` or Postgres `ereport(ERROR)` while executing a `PgXactCallbackEvent::Commit` /// or `PgXactCallbackEvent::Abort` event will immediately cause the Postgres backend to abort and /// the entire cluster to restart. /// /// As the Postgres internal documentation says: /// /// At transaction end, the callback occurs post-commit or post-abort, so the callback /// functions can only do noncritical cleanup. pub fn register_xact_callback<F>(which_event: PgXactCallbackEvent, f: F) -> XactCallbackReceipt where F: FnOnce() + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static, { // our map of xact callbacks. It starts as None and gets initialized below in maybe_initialize() static mut XACT_HOOKS: Option<CallbackMap> = None; // internal function that we register as an XactCallback unsafe extern "C" fn callback(event: pg_sys::XactEvent, _arg: *mut ::std::os::raw::c_void) { let which_event = PgXactCallbackEvent::translate_pg_event(event); let hooks = match which_event { // pgx's XactCallbacks are per-transaction, so when the transaction is over // (that's either Commit or Abort, which are mutually exclusive), we replace our // const XACT_HOOKS with a new, empty Map so that subsequent transactions won't accidentally run // these hooks again. // // Note that we still run any hooks that are registered for these events in this xact PgXactCallbackEvent::Commit | PgXactCallbackEvent::Abort | PgXactCallbackEvent::ParallelCommit | PgXactCallbackEvent::ParallelAbort => XACT_HOOKS .replace(HashMap::new()) .expect("XACT_HOOKS was None during Commit/Abort") .remove(&which_event), // not in a transaction-end event, so just borrow our map _ => XACT_HOOKS .as_mut() .expect("XACT_HOOKS was None") .remove(&which_event), }; // if we have a vec of hooks for this event they're consumed here and executed // in the order they were registered if let Some(hooks) = hooks { for hook in hooks.into_iter() { // TODO: do we need to catch panics and do something with them? They'll cause // the Postgres backend to Abort() if we're handling XactEvent::Commit/Abort events // effectively 'take' the hook from the internal RefCell if let Some(hook) = hook.replace(None) { // and execute it under guard for proper panic/elog(ERROR) handling crate::guard::guard(hook.0); } } } } // internal function to manage initialization of our transaction callback fn maybe_initialize<'a>() -> &'a mut CallbackMap { unsafe { // if this is our first time here since the Postgres backend started, XACT_HOOKS will be None if XACT_HOOKS.is_none() { // so lets swap it out with a new HashMap, which will live for the duration of the backend XACT_HOOKS.replace(HashMap::new()); // and register our single callback function (internally defined above) pg_sys::RegisterXactCallback(Some(callback), std::ptr::null_mut()); } XACT_HOOKS .as_mut() .expect("XACT_HOOKS was None during maybe_initialize") // this should never happen } } // get a mutable reference to XACT_HOOKS let hooks = maybe_initialize(); // wrap the user-provided closure as an optional, reference counted cell let wrapped_func = Rc::new(RefCell::new(Some(XactCallbackWrapper(Box::new(f))))); // find (or create) the map Entry for the specified event and add our wrapped hook to it let entry = hooks.entry(which_event).or_default(); entry.push(Rc::clone(&wrapped_func)); // give the user the ability to unregister XactCallbackReceipt(wrapped_func) } #[derive(Hash, Eq, PartialEq, Clone, Debug)] pub enum PgSubXactCallbackEvent { /// Fired when a subtransaction is aborted. While Rust `panic!()`s and Postgres `ereport(ERROR)`s /// can occur here, it's not recommended AbortSub, /// Fired when a subtransaction is committed. While Rust `panic!()`s and Postgres `ereport(ERROR)`s /// can occur here, it's not recommended CommitSub, /// Fired immediately before a subtransaction is committed. This is your last chance to instead /// abort the subtransaction before it really commits PreCommitSub, /// Fired when a subtransaction is first created StartSub, } impl PgSubXactCallbackEvent { fn translate_pg_event(event: pg_sys::SubXactEvent) -> Self { match event { pg_sys::SubXactEvent_SUBXACT_EVENT_ABORT_SUB => PgSubXactCallbackEvent::AbortSub, pg_sys::SubXactEvent_SUBXACT_EVENT_COMMIT_SUB => PgSubXactCallbackEvent::CommitSub, pg_sys::SubXactEvent_SUBXACT_EVENT_PRE_COMMIT_SUB => { PgSubXactCallbackEvent::PreCommitSub } pg_sys::SubXactEvent_SUBXACT_EVENT_START_SUB => PgSubXactCallbackEvent::StartSub, _ => panic!("Unrecognized SubXactEvent: {}", event), } } } /// Registering a sub-transaction event callback returns a `XactCallbackReceipt` that can be used /// to unregister the callback if it later (within the confines of the current transaction) /// becomes unnecessary pub struct SubXactCallbackReceipt(Rc<RefCell<Option<SubXactCallbackWrapper>>>); impl SubXactCallbackReceipt { /// Consumes this `SubXactCallbackReceipt` and unregisters the registered callback it represents /// /// ## Examples /// /// ```rust,no_run /// use pgx::*; /// /// let receipt = register_subxact_callback(PgSubXactCallbackEvent::CommitSub, |my_subid, parent_subid| info!("called after commit-sub: {} {}", my_subid, parent_subid)); /// /// let no_longer_necessary = true; /// /// if no_longer_necessary { /// receipt.unregister_callback(); /// } /// ``` pub fn unregister_callback(self) { self.0.replace(None); } } struct SubXactCallbackWrapper( Box< dyn Fn(pg_sys::SubTransactionId, pg_sys::SubTransactionId) + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static, >, ); type SubCallbackMap = HashMap<PgSubXactCallbackEvent, Vec<Rc<RefCell<Option<SubXactCallbackWrapper>>>>>; pub fn register_subxact_callback<F>( which_event: PgSubXactCallbackEvent, f: F, ) -> SubXactCallbackReceipt where F: Fn(pg_sys::SubTransactionId, pg_sys::SubTransactionId) + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static, { static mut SUB_HOOKS: Option<SubCallbackMap> = None; unsafe extern "C" fn callback( event: pg_sys::SubXactEvent, my_subid: pg_sys::SubTransactionId, parent_subid: pg_sys::SubTransactionId, _arg: *mut ::std::os::raw::c_void, ) { let which_event = PgSubXactCallbackEvent::translate_pg_event(event); let hooks = SUB_HOOKS.as_mut(); // if we have a vec of hooks for this event they're consumed here and executed // in the order they were registered if let Some(hooks) = hooks { let hooks = hooks.get(&which_event); if let Some(hooks) = hooks { for hook in hooks.iter() { let hook = hook.borrow(); if let Some(hook) = hook.as_ref() { crate::guard::guard(|| (hook.0)(my_subid, parent_subid)); } } } } } fn maybe_initialize<'a>() -> &'a mut SubCallbackMap { unsafe { if SUB_HOOKS.is_none() { SUB_HOOKS.replace(HashMap::new()); // unregister previous callback registration. It's okay if this is the first time pg_sys::UnregisterSubXactCallback(Some(callback), std::ptr::null_mut()); // register our new callback pg_sys::RegisterSubXactCallback(Some(callback), std::ptr::null_mut()); // register transaction callbacks so we can clear our hooks when the transaction ends // this is necessary b/c it's possible for the user to register sub transaction callbacks // within a transaction but a subtransaction never actually occurs register_xact_callback(PgXactCallbackEvent::Commit, || { // reset SUB_HOOKS to None on outer transaction COMMIT SUB_HOOKS.take(); }); register_xact_callback(PgXactCallbackEvent::Abort, || { // reset SUB_HOOKS to None on outer transaction ABORT SUB_HOOKS.take(); }); } SUB_HOOKS .as_mut() .expect("SUB_HOOKS was None during maybe_initialize()") // this should never happen } } let hooks = maybe_initialize(); let entry = hooks.entry(which_event).or_default(); let wrapped_func = Rc::new(RefCell::new(Some(SubXactCallbackWrapper(Box::new(f))))); entry.push(wrapped_func.clone()); SubXactCallbackReceipt(wrapped_func) }