supabase_wrappers/
utils.rs1use crate::interface::{Cell, Column, Row};
5use pgrx::{
6 list::List,
7 pg_sys::panic::{ErrorReport, ErrorReportable},
8 spi::Spi,
9 IntoDatum, *,
10};
11use std::ffi::c_void;
12use std::ffi::CStr;
13use std::num::NonZeroUsize;
14use std::ptr;
15use thiserror::Error;
16use tokio::runtime::{Builder, Runtime};
17use uuid::Uuid;
18
19#[inline]
27pub fn log_debug1(msg: &str) {
28 debug1!("wrappers: {}", msg);
29}
30
31#[inline]
42pub fn report_info(msg: &str) {
43 ereport!(
44 PgLogLevel::INFO,
45 PgSqlErrorCode::ERRCODE_SUCCESSFUL_COMPLETION,
46 msg,
47 "Wrappers"
48 );
49}
50
51#[inline]
62pub fn report_notice(msg: &str) {
63 ereport!(
64 PgLogLevel::NOTICE,
65 PgSqlErrorCode::ERRCODE_SUCCESSFUL_COMPLETION,
66 msg,
67 "Wrappers"
68 );
69}
70
71#[inline]
82pub fn report_warning(msg: &str) {
83 ereport!(
84 PgLogLevel::WARNING,
85 PgSqlErrorCode::ERRCODE_WARNING,
86 msg,
87 "Wrappers"
88 );
89}
90
91#[inline]
108pub fn report_error(code: PgSqlErrorCode, msg: &str) {
109 ereport!(PgLogLevel::ERROR, code, msg, "Wrappers");
110}
111
112#[derive(Error, Debug)]
113pub enum CreateRuntimeError {
114 #[error("failed to create async runtime: {0}")]
115 FailedToCreateAsyncRuntime(#[from] std::io::Error),
116}
117
118impl From<CreateRuntimeError> for ErrorReport {
119 fn from(value: CreateRuntimeError) -> Self {
120 let error_message = format!("{value}");
121 ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, error_message, "")
122 }
123}
124
125#[inline]
155pub fn create_async_runtime() -> Result<Runtime, CreateRuntimeError> {
156 Ok(Builder::new_current_thread().enable_all().build()?)
157}
158
159pub fn get_vault_secret(secret_id: &str) -> Option<String> {
164 match Uuid::try_parse(secret_id) {
165 Ok(sid) => {
166 let sid = sid.into_bytes();
167 match Spi::get_one_with_args::<String>(
168 "select decrypted_secret from vault.decrypted_secrets where id = $1 or key_id = $1",
169 &[pgrx::Uuid::from_bytes(sid).into()],
170 ) {
171 Ok(decrypted) => decrypted,
172 Err(err) => {
173 report_error(
174 PgSqlErrorCode::ERRCODE_FDW_ERROR,
175 &format!("query vault failed \"{}\": {}", secret_id, err),
176 );
177 None
178 }
179 }
180 }
181 Err(err) => {
182 report_error(
183 PgSqlErrorCode::ERRCODE_FDW_ERROR,
184 &format!("invalid secret id \"{}\": {}", secret_id, err),
185 );
186 None
187 }
188 }
189}
190
191pub fn get_vault_secret_by_name(secret_name: &str) -> Option<String> {
196 match Spi::get_one_with_args::<String>(
197 "select decrypted_secret from vault.decrypted_secrets where name = $1",
198 &[secret_name.into()],
199 ) {
200 Ok(decrypted) => decrypted,
201 Err(err) => {
202 report_error(
203 PgSqlErrorCode::ERRCODE_FDW_ERROR,
204 &format!("query vault failed \"{}\": {}", secret_name, err),
205 );
206 None
207 }
208 }
209}
210
211pub(super) unsafe fn tuple_table_slot_to_row(slot: *mut pg_sys::TupleTableSlot) -> Row {
212 let tup_desc = PgTupleDesc::from_pg_copy((*slot).tts_tupleDescriptor);
213
214 let mut should_free = false;
215 let htup = pg_sys::ExecFetchSlotHeapTuple(slot, false, &mut should_free);
216 let htup = PgBox::from_pg(htup);
217 let mut row = Row::new();
218
219 for (att_idx, attr) in tup_desc.iter().filter(|a| !a.attisdropped).enumerate() {
220 let col = pgrx::name_data_to_str(&attr.attname);
221 let attno = NonZeroUsize::new(att_idx + 1).unwrap();
222 let cell: Option<Cell> = pgrx::htup::heap_getattr(&htup, attno, &tup_desc);
223 row.push(col, cell);
224 }
225
226 row
227}
228
229pub(super) unsafe fn extract_target_columns(
231 root: *mut pg_sys::PlannerInfo,
232 baserel: *mut pg_sys::RelOptInfo,
233) -> Vec<Column> {
234 let mut ret = Vec::new();
235 let mut col_vars: *mut pg_sys::List = ptr::null_mut();
236
237 memcx::current_context(|mcx| {
238 if let Some(tgt_list) =
240 List::<*mut c_void>::downcast_ptr_in_memcx((*(*baserel).reltarget).exprs, mcx)
241 {
242 for tgt in tgt_list.iter() {
243 let tgt_cols = pg_sys::pull_var_clause(
244 *tgt as _,
245 (pg_sys::PVC_RECURSE_AGGREGATES | pg_sys::PVC_RECURSE_PLACEHOLDERS)
246 .try_into()
247 .unwrap(),
248 );
249 col_vars = pg_sys::list_union(col_vars, tgt_cols);
250 }
251 }
252
253 if let Some(conds) =
255 List::<*mut c_void>::downcast_ptr_in_memcx((*baserel).baserestrictinfo, mcx)
256 {
257 for cond in conds.iter() {
258 let expr = (*(*cond as *mut pg_sys::RestrictInfo)).clause;
259 let tgt_cols = pg_sys::pull_var_clause(
260 expr as _,
261 (pg_sys::PVC_RECURSE_AGGREGATES | pg_sys::PVC_RECURSE_PLACEHOLDERS)
262 .try_into()
263 .unwrap(),
264 );
265 col_vars = pg_sys::list_union(col_vars, tgt_cols);
266 }
267 }
268
269 if let Some(col_vars) = List::<*mut c_void>::downcast_ptr_in_memcx(col_vars, mcx) {
271 for var in col_vars.iter() {
272 let var: pg_sys::Var = *(*var as *mut pg_sys::Var);
273 let rte = pg_sys::planner_rt_fetch(var.varno as _, root);
274 let attno = var.varattno;
275 let attname = pg_sys::get_attname((*rte).relid, attno, true);
276 if !attname.is_null() {
277 if pg_sys::get_attgenerated((*rte).relid, attno) > 0 {
279 report_warning("generated column is not supported");
280 continue;
281 }
282
283 let type_oid = pg_sys::get_atttype((*rte).relid, attno);
284 ret.push(Column {
285 name: CStr::from_ptr(attname).to_str().unwrap().to_owned(),
286 num: attno as usize,
287 type_oid,
288 });
289 }
290 }
291 }
292 });
293
294 ret
295}
296
297pub(super) trait SerdeList {
300 unsafe fn serialize_to_list(state: PgBox<Self>) -> *mut pg_sys::List
301 where
302 Self: Sized,
303 {
304 let ret = memcx::current_context(|mcx| {
305 let mut ret = List::<*mut c_void>::Nil;
306 let val = state.into_pg() as i64;
307 let cst: *mut pg_sys::Const = pg_sys::makeConst(
308 pg_sys::INT8OID,
309 -1,
310 pg_sys::InvalidOid,
311 8,
312 val.into_datum().unwrap(),
313 false,
314 true,
315 );
316 ret.unstable_push_in_context(cst as _, mcx);
317 ret.into_ptr()
318 });
319
320 ret
321 }
322
323 unsafe fn deserialize_from_list(list: *mut pg_sys::List) -> PgBox<Self>
324 where
325 Self: Sized,
326 {
327 memcx::current_context(|mcx| {
328 if let Some(list) = List::<*mut c_void>::downcast_ptr_in_memcx(list, mcx) {
329 if let Some(cst) = list.get(0) {
330 let cst = *(*cst as *mut pg_sys::Const);
331 let ptr = i64::from_datum(cst.constvalue, cst.constisnull).unwrap();
332 return PgBox::<Self>::from_pg(ptr as _);
333 }
334 }
335 PgBox::<Self>::null()
336 })
337 }
338}
339
340pub(crate) trait ReportableError {
341 type Output;
342
343 fn report_unwrap(self) -> Self::Output;
344}
345
346impl<T, E: Into<ErrorReport>> ReportableError for Result<T, E> {
347 type Output = T;
348
349 fn report_unwrap(self) -> Self::Output {
350 self.map_err(|e| e.into()).unwrap_or_report()
351 }
352}