supabase_wrappers/
utils.rs

1//! Helper functions for working with Wrappers
2//!
3
4use crate::interface::{Cell, Column, Row};
5use pgrx::{
6    list::List,
7    pg_sys::panic::{ErrorReport, ErrorReportable},
8    spi::Spi,
9    IntoDatum, *,
10};
11use std::ffi::c_void;
12use std::ffi::CStr;
13use std::num::NonZeroUsize;
14use std::ptr;
15use thiserror::Error;
16use tokio::runtime::{Builder, Runtime};
17use uuid::Uuid;
18
19/// Log debug message to Postgres log.
20///
21/// A helper function to emit `DEBUG1` level message to Postgres's log.
22/// Set `log_min_messages = DEBUG1` in `postgresql.conf` to show the debug
23/// messages.
24///
25/// See more details in [Postgres documents](https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-LOGGING-WHEN).
26#[inline]
27pub fn log_debug1(msg: &str) {
28    debug1!("wrappers: {}", msg);
29}
30
31/// Report info to Postgres using `ereport!`
32///
33/// A simple wrapper of Postgres's `ereport!` function to emit info message.
34///
35/// For example,
36///
37/// ```rust,no_run
38/// # use supabase_wrappers::prelude::report_info;
39/// report_info(&format!("this is an info"));
40/// ```
41#[inline]
42pub fn report_info(msg: &str) {
43    ereport!(
44        PgLogLevel::INFO,
45        PgSqlErrorCode::ERRCODE_SUCCESSFUL_COMPLETION,
46        msg,
47        "Wrappers"
48    );
49}
50
51/// Report notice to Postgres using `ereport!`
52///
53/// A simple wrapper of Postgres's `ereport!` function to emit notice message.
54///
55/// For example,
56///
57/// ```rust,no_run
58/// # use supabase_wrappers::prelude::report_notice;
59/// report_notice(&format!("this is a notice"));
60/// ```
61#[inline]
62pub fn report_notice(msg: &str) {
63    ereport!(
64        PgLogLevel::NOTICE,
65        PgSqlErrorCode::ERRCODE_SUCCESSFUL_COMPLETION,
66        msg,
67        "Wrappers"
68    );
69}
70
71/// Report warning to Postgres using `ereport!`
72///
73/// A simple wrapper of Postgres's `ereport!` function to emit warning message.
74///
75/// For example,
76///
77/// ```rust,no_run
78/// # use supabase_wrappers::prelude::report_warning;
79/// report_warning(&format!("this is a warning"));
80/// ```
81#[inline]
82pub fn report_warning(msg: &str) {
83    ereport!(
84        PgLogLevel::WARNING,
85        PgSqlErrorCode::ERRCODE_WARNING,
86        msg,
87        "Wrappers"
88    );
89}
90
91/// Report error to Postgres using `ereport!`
92///
93/// A simple wrapper of Postgres's `ereport!` function to emit error message and
94/// aborts the current transaction.
95///
96/// For example,
97///
98/// ```rust,no_run
99/// # use supabase_wrappers::prelude::report_error;
100/// use pgrx::prelude::PgSqlErrorCode;
101///
102/// report_error(
103///     PgSqlErrorCode::ERRCODE_FDW_INVALID_COLUMN_NUMBER,
104///     &format!("target column number not match"),
105/// );
106/// ```
107#[inline]
108pub fn report_error(code: PgSqlErrorCode, msg: &str) {
109    ereport!(PgLogLevel::ERROR, code, msg, "Wrappers");
110}
111
112#[derive(Error, Debug)]
113pub enum CreateRuntimeError {
114    #[error("failed to create async runtime: {0}")]
115    FailedToCreateAsyncRuntime(#[from] std::io::Error),
116}
117
118impl From<CreateRuntimeError> for ErrorReport {
119    fn from(value: CreateRuntimeError) -> Self {
120        let error_message = format!("{value}");
121        ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, error_message, "")
122    }
123}
124
125/// Create a Tokio async runtime
126///
127/// Use this runtime to run async code in `block` mode. Run blocked code is
128/// required by Postgres callback functions which is fine because Postgres
129/// process is single-threaded.
130///
131/// For example,
132///
133/// ```rust,no_run
134/// # use supabase_wrappers::utils::CreateRuntimeError;
135/// # fn main() -> Result<(), CreateRuntimeError> {
136/// # use supabase_wrappers::prelude::create_async_runtime;
137/// # struct Client {
138/// # }
139/// # impl Client {
140/// #     async fn query(&self, _sql: &str) -> Result<(), ()> { Ok(()) }
141/// # }
142/// # let client = Client {};
143/// # let sql = "";
144/// let rt = create_async_runtime()?;
145///
146/// // client.query() is an async function returning a Result
147/// match rt.block_on(client.query(&sql)) {
148///     Ok(result) => { }
149///     Err(err) => { }
150/// }
151/// # Ok(())
152/// # }
153/// ```
154#[inline]
155pub fn create_async_runtime() -> Result<Runtime, CreateRuntimeError> {
156    Ok(Builder::new_current_thread().enable_all().build()?)
157}
158
159/// Get decrypted secret from Vault by secret ID
160///
161/// Get decrypted secret as string from Vault by secret ID. Vault is an extension for storing
162/// encrypted secrets, [see more details](https://github.com/supabase/vault).
163pub fn get_vault_secret(secret_id: &str) -> Option<String> {
164    match Uuid::try_parse(secret_id) {
165        Ok(sid) => {
166            let sid = sid.into_bytes();
167            match Spi::get_one_with_args::<String>(
168                "select decrypted_secret from vault.decrypted_secrets where id = $1 or key_id = $1",
169                &[pgrx::Uuid::from_bytes(sid).into()],
170            ) {
171                Ok(decrypted) => decrypted,
172                Err(err) => {
173                    report_error(
174                        PgSqlErrorCode::ERRCODE_FDW_ERROR,
175                        &format!("query vault failed \"{}\": {}", secret_id, err),
176                    );
177                    None
178                }
179            }
180        }
181        Err(err) => {
182            report_error(
183                PgSqlErrorCode::ERRCODE_FDW_ERROR,
184                &format!("invalid secret id \"{}\": {}", secret_id, err),
185            );
186            None
187        }
188    }
189}
190
191/// Get decrypted secret from Vault by secret name
192///
193/// Get decrypted secret as string from Vault by secret name. Vault is an extension for storing
194/// encrypted secrets, [see more details](https://github.com/supabase/vault).
195pub fn get_vault_secret_by_name(secret_name: &str) -> Option<String> {
196    match Spi::get_one_with_args::<String>(
197        "select decrypted_secret from vault.decrypted_secrets where name = $1",
198        &[secret_name.into()],
199    ) {
200        Ok(decrypted) => decrypted,
201        Err(err) => {
202            report_error(
203                PgSqlErrorCode::ERRCODE_FDW_ERROR,
204                &format!("query vault failed \"{}\": {}", secret_name, err),
205            );
206            None
207        }
208    }
209}
210
211pub(super) unsafe fn tuple_table_slot_to_row(slot: *mut pg_sys::TupleTableSlot) -> Row {
212    let tup_desc = PgTupleDesc::from_pg_copy((*slot).tts_tupleDescriptor);
213
214    let mut should_free = false;
215    let htup = pg_sys::ExecFetchSlotHeapTuple(slot, false, &mut should_free);
216    let htup = PgBox::from_pg(htup);
217    let mut row = Row::new();
218
219    for (att_idx, attr) in tup_desc.iter().filter(|a| !a.attisdropped).enumerate() {
220        let col = pgrx::name_data_to_str(&attr.attname);
221        let attno = NonZeroUsize::new(att_idx + 1).unwrap();
222        let cell: Option<Cell> = pgrx::htup::heap_getattr(&htup, attno, &tup_desc);
223        row.push(col, cell);
224    }
225
226    row
227}
228
229// extract target column name and attribute no list
230pub(super) unsafe fn extract_target_columns(
231    root: *mut pg_sys::PlannerInfo,
232    baserel: *mut pg_sys::RelOptInfo,
233) -> Vec<Column> {
234    let mut ret = Vec::new();
235    let mut col_vars: *mut pg_sys::List = ptr::null_mut();
236
237    memcx::current_context(|mcx| {
238        // gather vars from target column list
239        if let Some(tgt_list) =
240            List::<*mut c_void>::downcast_ptr_in_memcx((*(*baserel).reltarget).exprs, mcx)
241        {
242            for tgt in tgt_list.iter() {
243                let tgt_cols = pg_sys::pull_var_clause(
244                    *tgt as _,
245                    (pg_sys::PVC_RECURSE_AGGREGATES | pg_sys::PVC_RECURSE_PLACEHOLDERS)
246                        .try_into()
247                        .unwrap(),
248                );
249                col_vars = pg_sys::list_union(col_vars, tgt_cols);
250            }
251        }
252
253        // gather vars from restrictions
254        if let Some(conds) =
255            List::<*mut c_void>::downcast_ptr_in_memcx((*baserel).baserestrictinfo, mcx)
256        {
257            for cond in conds.iter() {
258                let expr = (*(*cond as *mut pg_sys::RestrictInfo)).clause;
259                let tgt_cols = pg_sys::pull_var_clause(
260                    expr as _,
261                    (pg_sys::PVC_RECURSE_AGGREGATES | pg_sys::PVC_RECURSE_PLACEHOLDERS)
262                        .try_into()
263                        .unwrap(),
264                );
265                col_vars = pg_sys::list_union(col_vars, tgt_cols);
266            }
267        }
268
269        // get column names from var list
270        if let Some(col_vars) = List::<*mut c_void>::downcast_ptr_in_memcx(col_vars, mcx) {
271            for var in col_vars.iter() {
272                let var: pg_sys::Var = *(*var as *mut pg_sys::Var);
273                let rte = pg_sys::planner_rt_fetch(var.varno as _, root);
274                let attno = var.varattno;
275                let attname = pg_sys::get_attname((*rte).relid, attno, true);
276                if !attname.is_null() {
277                    // generated column is not supported
278                    if pg_sys::get_attgenerated((*rte).relid, attno) > 0 {
279                        report_warning("generated column is not supported");
280                        continue;
281                    }
282
283                    let type_oid = pg_sys::get_atttype((*rte).relid, attno);
284                    ret.push(Column {
285                        name: CStr::from_ptr(attname).to_str().unwrap().to_owned(),
286                        num: attno as usize,
287                        type_oid,
288                    });
289                }
290            }
291        }
292    });
293
294    ret
295}
296
297// trait for "serialize" and "deserialize" state from specified memory context,
298// so that it is safe to be carried between the planning and the execution
299pub(super) trait SerdeList {
300    unsafe fn serialize_to_list(state: PgBox<Self>) -> *mut pg_sys::List
301    where
302        Self: Sized,
303    {
304        let ret = memcx::current_context(|mcx| {
305            let mut ret = List::<*mut c_void>::Nil;
306            let val = state.into_pg() as i64;
307            let cst: *mut pg_sys::Const = pg_sys::makeConst(
308                pg_sys::INT8OID,
309                -1,
310                pg_sys::InvalidOid,
311                8,
312                val.into_datum().unwrap(),
313                false,
314                true,
315            );
316            ret.unstable_push_in_context(cst as _, mcx);
317            ret.into_ptr()
318        });
319
320        ret
321    }
322
323    unsafe fn deserialize_from_list(list: *mut pg_sys::List) -> PgBox<Self>
324    where
325        Self: Sized,
326    {
327        memcx::current_context(|mcx| {
328            if let Some(list) = List::<*mut c_void>::downcast_ptr_in_memcx(list, mcx) {
329                if let Some(cst) = list.get(0) {
330                    let cst = *(*cst as *mut pg_sys::Const);
331                    let ptr = i64::from_datum(cst.constvalue, cst.constisnull).unwrap();
332                    return PgBox::<Self>::from_pg(ptr as _);
333                }
334            }
335            PgBox::<Self>::null()
336        })
337    }
338}
339
340pub(crate) trait ReportableError {
341    type Output;
342
343    fn report_unwrap(self) -> Self::Output;
344}
345
346impl<T, E: Into<ErrorReport>> ReportableError for Result<T, E> {
347    type Output = T;
348
349    fn report_unwrap(self) -> Self::Output {
350        self.map_err(|e| e.into()).unwrap_or_report()
351    }
352}