use pgrx::pg_sys::panic::ErrorReport;
use pgrx::{
FromDatum, IntoDatum, PgSqlErrorCode, debug2,
list::List,
memcxt::PgMemoryContexts,
pg_sys::{MemoryContext, MemoryContextData, Oid},
prelude::*,
rel::PgRelation,
tupdesc::PgTupleDesc,
};
use std::collections::HashMap;
use std::ffi::c_void;
use std::marker::PhantomData;
use std::os::raw::c_int;
use std::ptr;
use crate::prelude::*;
use super::instance;
use super::memctx;
use super::polyfill;
use super::utils;
struct FdwModifyPrivate {
foreigntableid: Oid,
rowid_name: String,
rowid_typid: Oid,
#[cfg(feature = "pg13")]
update_cols: Vec<String>,
}
impl FdwModifyPrivate {
unsafe fn serialize_to_list(&self) -> *mut pg_sys::List {
pgrx::memcx::current_context(|mcx| unsafe {
let mut ret = List::<*mut c_void>::Nil;
let cst = pg_sys::makeConst(
pg_sys::INT4OID,
-1,
pg_sys::InvalidOid,
4,
(self.foreigntableid.to_u32() as i32).into_datum().unwrap(),
false,
true,
);
ret.unstable_push_in_context(cst as _, mcx);
let cst = pg_sys::makeConst(
pg_sys::TEXTOID,
-1,
pg_sys::InvalidOid,
-1,
self.rowid_name.clone().into_datum().unwrap(),
false,
false,
);
ret.unstable_push_in_context(cst as _, mcx);
let cst = pg_sys::makeConst(
pg_sys::INT4OID,
-1,
pg_sys::InvalidOid,
4,
(self.rowid_typid.to_u32() as i32).into_datum().unwrap(),
false,
true,
);
ret.unstable_push_in_context(cst as _, mcx);
#[cfg(feature = "pg13")]
{
let cst = pg_sys::makeConst(
pg_sys::INT4OID,
-1,
pg_sys::InvalidOid,
4,
(self.update_cols.len() as i32).into_datum().unwrap(),
false,
true,
);
ret.unstable_push_in_context(cst as _, mcx);
for col in &self.update_cols {
let cst = pg_sys::makeConst(
pg_sys::TEXTOID,
-1,
pg_sys::InvalidOid,
-1,
col.clone().into_datum().unwrap(),
false,
false,
);
ret.unstable_push_in_context(cst as _, mcx);
}
}
ret.into_ptr()
})
}
unsafe fn deserialize_from_list(list: *mut pg_sys::List) -> Option<Self> {
pgrx::memcx::current_context(|mcx| unsafe {
let list = List::<*mut c_void>::downcast_ptr_in_memcx(list, mcx)?;
let cst_ptr = *list.get(0)? as *mut pg_sys::Const;
let cst = *cst_ptr;
let foreigntableid_i32 = i32::from_datum(cst.constvalue, cst.constisnull)?;
let foreigntableid = Oid::from(foreigntableid_i32 as u32);
let cst_ptr = *list.get(1)? as *mut pg_sys::Const;
let cst = *cst_ptr;
let rowid_name = String::from_datum(cst.constvalue, cst.constisnull)?;
let cst_ptr = *list.get(2)? as *mut pg_sys::Const;
let cst = *cst_ptr;
let rowid_typid_i32 = i32::from_datum(cst.constvalue, cst.constisnull)?;
let rowid_typid = Oid::from(rowid_typid_i32 as u32);
#[cfg(feature = "pg13")]
let update_cols = {
let cst_ptr = *list.get(3)? as *mut pg_sys::Const;
let cst = *cst_ptr;
let count = i32::from_datum(cst.constvalue, cst.constisnull)? as usize;
let mut cols = Vec::with_capacity(count);
for i in 0..count {
let cst_ptr = *list.get(4 + i)? as *mut pg_sys::Const;
let cst = *cst_ptr;
let col = String::from_datum(cst.constvalue, cst.constisnull)?;
cols.push(col);
}
cols
};
Some(FdwModifyPrivate {
foreigntableid,
rowid_name,
rowid_typid,
#[cfg(feature = "pg13")]
update_cols,
})
})
}
}
struct FdwModifyState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
instance: Option<W>,
rowid_name: String,
rowid_attno: pg_sys::AttrNumber,
rowid_typid: Oid,
opts: HashMap<String, String>,
tmp_ctx: MemoryContext,
_phantom: PhantomData<E>,
#[cfg(feature = "pg13")]
update_cols: Vec<String>,
}
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwModifyState<E, W> {
fn begin_modify(&mut self) -> Result<(), E> {
if let Some(ref mut instance) = self.instance {
instance.begin_modify(&self.opts)
} else {
Ok(())
}
}
fn insert(&mut self, row: &Row) -> Result<(), E> {
if let Some(ref mut instance) = self.instance {
instance.insert(row)
} else {
Ok(())
}
}
fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), E> {
if let Some(ref mut instance) = self.instance {
instance.update(rowid, new_row)
} else {
Ok(())
}
}
fn delete(&mut self, rowid: &Cell) -> Result<(), E> {
if let Some(ref mut instance) = self.instance {
instance.delete(rowid)
} else {
Ok(())
}
}
fn end_modify(&mut self) -> Result<(), E> {
if let Some(ref mut instance) = self.instance {
instance.end_modify()
} else {
Ok(())
}
}
}
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> Drop for FdwModifyState<E, W> {
fn drop(&mut self) {
self.instance.take();
unsafe {
memctx::delete_wrappers_memctx(self.tmp_ctx);
self.tmp_ctx = ptr::null::<MemoryContextData>() as _;
}
}
}
unsafe fn drop_fdw_modify_state<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
fdw_state: *mut FdwModifyState<E, W>,
) {
let boxed_fdw_state = unsafe { Box::from_raw(fdw_state) };
drop(boxed_fdw_state);
}
unsafe fn find_rowid_column(
target_relation: pg_sys::Relation,
) -> Option<pg_sys::FormData_pg_attribute> {
let ftable = unsafe { pg_sys::GetForeignTable((*target_relation).rd_id) };
let opts = unsafe { options_to_hashmap((*ftable).options).report_unwrap() };
let rowid_name = require_option("rowid_column", &opts).report_unwrap();
let tup_desc = unsafe { PgTupleDesc::from_pg_copy((*target_relation).rd_att) };
for attr in tup_desc.iter().filter(|a| !a.is_dropped()) {
if pgrx::name_data_to_str(&attr.attname) == rowid_name {
return Some(*attr);
}
}
report_error(
PgSqlErrorCode::ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION,
"cannot find rowid_column attribute in the foreign table",
);
None
}
#[cfg(feature = "pg13")]
#[pg_guard]
pub(super) extern "C-unwind" fn add_foreign_update_targets(
parsetree: *mut pg_sys::Query,
_target_rte: *mut pg_sys::RangeTblEntry,
target_relation: pg_sys::Relation,
) {
debug2!("---> add_foreign_update_targets");
unsafe {
if let Some(attr) = find_rowid_column(target_relation) {
let var = pg_sys::makeVar(
(*parsetree).resultRelation as _,
attr.attnum,
attr.atttypid,
attr.atttypmod,
attr.attcollation,
0,
);
let tle = pg_sys::makeTargetEntry(
var as _,
((*(*parsetree).targetList).length + 1) as _,
pg_sys::pstrdup(attr.attname.data.as_ptr()),
true,
);
(*parsetree).targetList = pg_sys::lappend((*parsetree).targetList, tle as _);
}
}
}
#[cfg(not(feature = "pg13"))]
#[pg_guard]
pub(super) extern "C-unwind" fn add_foreign_update_targets(
root: *mut pg_sys::PlannerInfo,
rtindex: pg_sys::Index,
_target_rte: *mut pg_sys::RangeTblEntry,
target_relation: pg_sys::Relation,
) {
debug2!("---> add_foreign_update_targets");
unsafe {
if let Some(attr) = find_rowid_column(target_relation) {
let var = pg_sys::makeVar(
rtindex as _,
attr.attnum,
attr.atttypid,
attr.atttypmod,
attr.attcollation,
0,
);
pg_sys::add_row_identity_var(root, var, rtindex, &attr.attname.data as _);
}
}
}
#[pg_guard]
#[allow(clippy::extra_unused_type_parameters)]
pub(super) extern "C-unwind" fn plan_foreign_modify<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
root: *mut pg_sys::PlannerInfo,
plan: *mut pg_sys::ModifyTable,
result_relation: pg_sys::Index,
_subplan_index: c_int,
) -> *mut pg_sys::List {
debug2!("---> plan_foreign_modify");
unsafe {
if !(*plan).returningLists.is_null() {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
"RETURNING is not supported",
)
}
let rte = pg_sys::planner_rt_fetch(result_relation, root);
let rel = PgRelation::with_lock((*rte).relid, pg_sys::NoLock as _);
let ftable = pg_sys::GetForeignTable(rel.oid());
let opts = options_to_hashmap((*ftable).options).report_unwrap();
let rowid_name = opts.get("rowid_column");
if rowid_name.is_none() {
report_error(
PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND,
"option 'rowid_column' is required",
);
return ptr::null_mut();
}
let rowid_name = rowid_name.unwrap();
let tup_desc = PgTupleDesc::from_relation(&rel);
for attr in tup_desc.iter().filter(|a| !a.attisdropped) {
let attname = pgrx::name_data_to_str(&attr.attname);
if attname == rowid_name {
let foreigntableid = rel.oid();
#[cfg(feature = "pg13")]
let update_cols = {
let mut cols = Vec::new();
let tgts: pgrx::PgList<pg_sys::TargetEntry> =
pgrx::PgList::from_pg((*(*root).parse).targetList);
for tgt in tgts.iter_ptr() {
let col_name = std::ffi::CStr::from_ptr((*tgt).resname)
.to_str()
.unwrap()
.to_owned();
if !(*tgt).resjunk {
cols.push(col_name);
}
}
cols
};
let private = FdwModifyPrivate {
foreigntableid,
rowid_name: rowid_name.to_string(),
rowid_typid: attr.atttypid,
#[cfg(feature = "pg13")]
update_cols,
};
return private.serialize_to_list();
}
}
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
&format!("rowid_column attribute {rowid_name:?} does not exist",),
);
ptr::null_mut()
}
}
#[pg_guard]
pub(super) extern "C-unwind" fn begin_foreign_modify<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
mtstate: *mut pg_sys::ModifyTableState,
rinfo: *mut pg_sys::ResultRelInfo,
fdw_private: *mut pg_sys::List,
_subplan_index: c_int,
eflags: c_int,
) {
debug2!("---> begin_foreign_modify");
if eflags & pg_sys::EXEC_FLAG_EXPLAIN_ONLY as c_int > 0 {
return;
}
unsafe {
let private = FdwModifyPrivate::deserialize_from_list(fdw_private);
if private.is_none() {
report_error(
PgSqlErrorCode::ERRCODE_FDW_ERROR,
"invalid fdw_private data in begin_foreign_modify",
);
return;
}
let private = private.unwrap();
let ctx_name = format!("Wrappers_modify_{}", private.foreigntableid.to_u32());
let tmp_ctx = memctx::create_wrappers_memctx(&ctx_name);
let fdw_instance: W = instance::create_fdw_instance_from_table_id(private.foreigntableid);
let ftable = pg_sys::GetForeignTable(private.foreigntableid);
let mut opts = options_to_hashmap((*ftable).options).report_unwrap();
opts.insert(
"wrappers.fserver_oid".into(),
(*ftable).serverid.to_u32().to_string(),
);
opts.insert(
"wrappers.ftable_oid".into(),
(*ftable).relid.to_u32().to_string(),
);
let mut state = FdwModifyState::<E, W> {
instance: Some(fdw_instance),
rowid_name: private.rowid_name,
rowid_attno: 0, rowid_typid: private.rowid_typid,
opts,
tmp_ctx,
_phantom: PhantomData,
#[cfg(feature = "pg13")]
update_cols: private.update_cols,
};
#[cfg(feature = "pg13")]
let subplan = (*(*(*mtstate).mt_plans.offset(_subplan_index as _))).plan;
#[cfg(not(feature = "pg13"))]
let subplan = (*polyfill::outer_plan_state(&mut (*mtstate).ps)).plan;
let rowid_name_c = PgMemoryContexts::For(state.tmp_ctx).pstrdup(&state.rowid_name);
state.rowid_attno =
pg_sys::ExecFindJunkAttributeInTlist((*subplan).targetlist, rowid_name_c);
let state_ptr = Box::leak(Box::new(state));
let mut state = PgBox::<FdwModifyState<E, W>>::from_pg(state_ptr as _);
let result = state.begin_modify();
if result.is_err() {
drop_fdw_modify_state(state.as_ptr());
result.report_unwrap();
}
(*rinfo).ri_FdwState = state.into_pg() as _;
}
}
#[pg_guard]
pub(super) extern "C-unwind" fn exec_foreign_insert<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
_estate: *mut pg_sys::EState,
rinfo: *mut pg_sys::ResultRelInfo,
slot: *mut pg_sys::TupleTableSlot,
_plan_slot: *mut pg_sys::TupleTableSlot,
) -> *mut pg_sys::TupleTableSlot {
debug2!("---> exec_foreign_insert");
unsafe {
let mut state = PgBox::<FdwModifyState<E, W>>::from_pg(
(*rinfo).ri_FdwState as *mut FdwModifyState<E, W>,
);
let result = PgMemoryContexts::For(state.tmp_ctx).switch_to(|_| {
let row = utils::tuple_table_slot_to_row(slot);
state.insert(&row)
});
if result.is_err() {
drop_fdw_modify_state(state.as_ptr());
(*rinfo).ri_FdwState = ptr::null::<FdwModifyState<E, W>>() as _;
result.report_unwrap();
}
}
slot
}
unsafe fn get_rowid_cell<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
state: &FdwModifyState<E, W>,
plan_slot: *mut pg_sys::TupleTableSlot,
) -> Option<Cell> {
let mut is_null: bool = true;
unsafe {
let datum = polyfill::slot_getattr(plan_slot, state.rowid_attno.into(), &mut is_null);
Cell::from_polymorphic_datum(datum, is_null, state.rowid_typid)
}
}
#[pg_guard]
pub(super) extern "C-unwind" fn exec_foreign_delete<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
_estate: *mut pg_sys::EState,
rinfo: *mut pg_sys::ResultRelInfo,
slot: *mut pg_sys::TupleTableSlot,
plan_slot: *mut pg_sys::TupleTableSlot,
) -> *mut pg_sys::TupleTableSlot {
debug2!("---> exec_foreign_delete");
unsafe {
let mut state = PgBox::<FdwModifyState<E, W>>::from_pg(
(*rinfo).ri_FdwState as *mut FdwModifyState<E, W>,
);
let result = PgMemoryContexts::For(state.tmp_ctx).switch_to(|_| {
let cell = get_rowid_cell(&state, plan_slot);
if let Some(rowid) = cell {
state.delete(&rowid)
} else {
Ok(())
}
});
if result.is_err() {
drop_fdw_modify_state(state.as_ptr());
(*rinfo).ri_FdwState = ptr::null::<FdwModifyState<E, W>>() as _;
result.report_unwrap();
}
}
slot
}
#[pg_guard]
pub(super) extern "C-unwind" fn exec_foreign_update<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
_estate: *mut pg_sys::EState,
rinfo: *mut pg_sys::ResultRelInfo,
slot: *mut pg_sys::TupleTableSlot,
plan_slot: *mut pg_sys::TupleTableSlot,
) -> *mut pg_sys::TupleTableSlot {
debug2!("---> exec_foreign_update");
unsafe {
let mut state = PgBox::<FdwModifyState<E, W>>::from_pg(
(*rinfo).ri_FdwState as *mut FdwModifyState<E, W>,
);
let result = PgMemoryContexts::For(state.tmp_ctx).switch_to(|_| {
let rowid_cell = get_rowid_cell(&state, plan_slot);
if let Some(rowid) = rowid_cell {
let mut new_row = utils::tuple_table_slot_to_row(plan_slot);
let tup_desc = PgTupleDesc::from_pg_copy((*slot).tts_tupleDescriptor);
new_row.retain(|(col, _)| {
let is_ft_col = tup_desc.iter().filter(|a| !a.attisdropped).any(|a| {
let attr_name = pgrx::name_data_to_str(&a.attname);
attr_name == col.as_str()
});
#[cfg(not(feature = "pg13"))]
{
is_ft_col && state.rowid_name != col.as_str()
}
#[cfg(feature = "pg13")]
{
is_ft_col && state.update_cols.iter().any(|c| c == col.as_str())
}
});
state.update(&rowid, &new_row)
} else {
Ok(())
}
});
if result.is_err() {
drop_fdw_modify_state(state.as_ptr());
(*rinfo).ri_FdwState = ptr::null::<FdwModifyState<E, W>>() as _;
result.report_unwrap();
}
}
slot
}
#[pg_guard]
pub(super) extern "C-unwind" fn end_foreign_modify<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
_estate: *mut pg_sys::EState,
rinfo: *mut pg_sys::ResultRelInfo,
) {
debug2!("---> end_foreign_modify");
unsafe {
let fdw_state = (*rinfo).ri_FdwState as *mut FdwModifyState<E, W>;
if fdw_state.is_null() {
return;
}
let mut state = PgBox::<FdwModifyState<E, W>>::from_pg(fdw_state);
let result = state.end_modify();
drop_fdw_modify_state(state.as_ptr());
(*rinfo).ri_FdwState = ptr::null::<FdwModifyState<E, W>>() as _;
result.report_unwrap();
}
}