use pgrx::FromDatum;
use pgrx::{
IntoDatum, PgSqlErrorCode, debug2,
memcxt::PgMemoryContexts,
pg_sys::{Datum, MemoryContext, MemoryContextData, Oid, ParamKind},
prelude::*,
};
use std::collections::HashMap;
use std::marker::PhantomData;
use pgrx::pg_sys::panic::ErrorReport;
use std::os::raw::c_int;
use std::ptr;
use crate::instance;
use crate::interface::{Aggregate, Cell, Column, Limit, Qual, Row, Sort, Value};
use crate::limit::*;
use crate::memctx;
use crate::options::options_to_hashmap;
use crate::polyfill;
use crate::prelude::ForeignDataWrapper;
use crate::qual::*;
use crate::sort::*;
use crate::utils::{self, ReportableError, SerdeList, report_error};
pub(crate) struct FdwState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
pub(crate) instance: Option<W>,
pub(crate) quals: Vec<Qual>,
pub(crate) tgts: Vec<Column>,
pub(crate) sorts: Vec<Sort>,
pub(crate) limit: Option<Limit>,
pub(crate) opts: HashMap<String, String>,
pub(crate) aggregates: Vec<Aggregate>,
pub(crate) group_by: Vec<Column>,
tmp_ctx: MemoryContext,
values: Vec<Datum>,
nulls: Vec<bool>,
row: Row,
param_fingerprint: String,
_phantom: PhantomData<E>,
}
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwState<E, W> {
unsafe fn new(foreigntableid: Oid, tmp_ctx: MemoryContext) -> Self {
Self {
instance: Some(unsafe { instance::create_fdw_instance_from_table_id(foreigntableid) }),
quals: Vec::new(),
tgts: Vec::new(),
sorts: Vec::new(),
limit: None,
opts: HashMap::new(),
aggregates: Vec::new(),
group_by: Vec::new(),
tmp_ctx,
values: Vec::new(),
nulls: Vec::new(),
row: Row::new(),
param_fingerprint: String::new(),
_phantom: PhantomData,
}
}
#[inline]
fn get_rel_size(&mut self) -> Result<(i64, i32), E> {
if let Some(ref mut instance) = self.instance {
instance.get_rel_size(
&self.quals,
&self.tgts,
&self.sorts,
&self.limit,
&self.opts,
)
} else {
Ok((0, 0))
}
}
#[inline]
pub(crate) fn is_aggregate_scan(&self) -> bool {
!self.aggregates.is_empty()
}
#[inline]
fn begin_aggregate_scan(&mut self) -> Result<(), E> {
if let Some(ref mut instance) = self.instance {
instance.begin_aggregate_scan(&self.aggregates, &self.group_by, &self.quals, &self.opts)
} else {
Ok(())
}
}
#[inline]
fn begin_scan(&mut self) -> Result<(), E> {
if let Some(ref mut instance) = self.instance {
instance.begin_scan(
&self.quals,
&self.tgts,
&self.sorts,
&self.limit,
&self.opts,
)
} else {
Ok(())
}
}
#[inline]
fn iter_scan(&mut self) -> Result<Option<()>, E> {
if let Some(ref mut instance) = self.instance {
instance.iter_scan(&mut self.row)
} else {
Ok(None)
}
}
#[inline]
fn re_scan(&mut self) -> Result<(), E> {
if let Some(ref mut instance) = self.instance {
instance.re_scan()
} else {
Ok(())
}
}
#[inline]
fn end_scan(&mut self) -> Result<(), E> {
if let Some(ref mut instance) = self.instance {
instance.end_scan()
} else {
Ok(())
}
}
}
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> utils::SerdeList for FdwState<E, W> {}
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> Drop for FdwState<E, W> {
fn drop(&mut self) {
self.instance.take();
unsafe {
memctx::delete_wrappers_memctx(self.tmp_ctx);
self.tmp_ctx = ptr::null::<MemoryContextData>() as _;
}
}
}
unsafe fn drop_fdw_state<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
fdw_state: *mut FdwState<E, W>,
) {
let boxed_fdw_state = unsafe { Box::from_raw(fdw_state) };
drop(boxed_fdw_state);
}
#[pg_guard]
pub(super) extern "C-unwind" fn get_foreign_rel_size<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
root: *mut pg_sys::PlannerInfo,
baserel: *mut pg_sys::RelOptInfo,
foreigntableid: pg_sys::Oid,
) {
debug2!("---> get_foreign_rel_size");
unsafe {
let ctx_name = format!("Wrappers_scan_{}", foreigntableid.to_u32());
let ctx = memctx::create_wrappers_memctx(&ctx_name);
let mut state = FdwState::<E, W>::new(foreigntableid, ctx);
PgMemoryContexts::For(state.tmp_ctx).switch_to(|_| {
state.quals = extract_quals(root, baserel, foreigntableid);
state.tgts = utils::extract_target_columns(root, baserel);
state.sorts = extract_sorts(root, baserel, foreigntableid);
state.limit = extract_limit(root, baserel, foreigntableid);
let ftable = pg_sys::GetForeignTable(foreigntableid);
state.opts = options_to_hashmap((*ftable).options).report_unwrap();
state.opts.insert(
"wrappers.fserver_oid".into(),
(*ftable).serverid.to_u32().to_string(),
);
state.opts.insert(
"wrappers.ftable_oid".into(),
(*ftable).relid.to_u32().to_string(),
);
});
let (rows, width) = state.get_rel_size().report_unwrap();
(*baserel).rows = rows as f64;
(*(*baserel).reltarget).width = width;
(*baserel).fdw_private = Box::leak(Box::new(state)) as *mut FdwState<E, W> as _;
}
}
#[pg_guard]
pub(super) extern "C-unwind" fn get_foreign_paths<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
root: *mut pg_sys::PlannerInfo,
baserel: *mut pg_sys::RelOptInfo,
_foreigntableid: pg_sys::Oid,
) {
debug2!("---> get_foreign_paths");
unsafe {
let state = PgBox::<FdwState<E, W>>::from_pg((*baserel).fdw_private as _);
let startup_cost = state
.opts
.get("startup_cost")
.map(|c| match c.parse::<f64>() {
Ok(v) => v,
Err(_) => {
pgrx::error!("invalid option startup_cost: {}", c);
}
})
.unwrap_or(0.0);
let total_cost = startup_cost + (*baserel).rows;
let path = pg_sys::create_foreignscan_path(
root,
baserel,
ptr::null_mut(), (*baserel).rows,
#[cfg(feature = "pg18")]
0, startup_cost,
total_cost,
ptr::null_mut(), ptr::null_mut(), ptr::null_mut(), #[cfg(any(feature = "pg17", feature = "pg18"))]
ptr::null_mut(), ptr::null_mut(), );
pg_sys::add_path(baserel, &mut ((*path).path));
}
}
#[pg_guard]
pub(super) extern "C-unwind" fn get_foreign_plan<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
_root: *mut pg_sys::PlannerInfo,
baserel: *mut pg_sys::RelOptInfo,
_foreigntableid: pg_sys::Oid,
_best_path: *mut pg_sys::ForeignPath,
tlist: *mut pg_sys::List,
scan_clauses: *mut pg_sys::List,
outer_plan: *mut pg_sys::Plan,
) -> *mut pg_sys::ForeignScan {
debug2!("---> get_foreign_plan");
unsafe {
let mut state = PgBox::<FdwState<E, W>>::from_pg((*baserel).fdw_private as _);
let scan_clauses = pg_sys::extract_actual_clauses(scan_clauses, false);
let is_agg = (*baserel).reloptkind == pg_sys::RelOptKind::RELOPT_UPPER_REL
&& state.is_aggregate_scan();
if !is_agg && state.is_aggregate_scan() {
state.aggregates = Vec::new();
state.group_by = Vec::new();
}
let (final_tlist, agg_fdw_scan_tlist) = if is_agg {
let reltarget = (*baserel).reltarget;
let exprs = (*reltarget).exprs;
let n = if exprs.is_null() {
0
} else {
(*exprs).length as usize
};
let mut agg_tlist: *mut pg_sys::List = ptr::null_mut();
for i in 0..n {
let cell = (*exprs).elements.add(i);
let expr = (*cell).ptr_value as *mut pg_sys::Expr;
let tle = pg_sys::makeTargetEntry(
expr,
(i + 1) as pg_sys::AttrNumber,
ptr::null_mut(),
false,
);
if !(*reltarget).sortgrouprefs.is_null() {
(*tle).ressortgroupref = *(*reltarget).sortgrouprefs.add(i);
}
agg_tlist = pg_sys::lappend(agg_tlist, tle as *mut std::ffi::c_void);
}
let fdw_scan_tlist = pg_sys::list_copy(agg_tlist);
let mut new_tgts = Vec::new();
let mut col_num = 1usize;
for col in &state.group_by {
new_tgts.push(Column {
name: col.name.clone(),
num: col_num,
type_oid: col.type_oid,
});
col_num += 1;
}
for agg in &state.aggregates {
new_tgts.push(Column {
name: agg.alias.clone(),
num: col_num,
type_oid: agg.type_oid,
});
col_num += 1;
}
state.tgts = new_tgts;
(agg_tlist, fdw_scan_tlist)
} else {
(tlist, ptr::null_mut())
};
let fdw_private =
PgMemoryContexts::For(state.tmp_ctx).switch_to(|_| FdwState::serialize_to_list(state));
pg_sys::make_foreignscan(
final_tlist,
scan_clauses,
(*baserel).relid,
ptr::null_mut(),
fdw_private as _,
agg_fdw_scan_tlist,
ptr::null_mut(),
outer_plan,
)
}
}
#[pg_guard]
pub(super) extern "C-unwind" fn explain_foreign_scan<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
node: *mut pg_sys::ForeignScanState,
es: *mut pg_sys::ExplainState,
) {
debug2!("---> explain_foreign_scan");
unsafe {
let fdw_state = (*node).fdw_state as *mut FdwState<E, W>;
if fdw_state.is_null() {
return;
}
let state = PgBox::<FdwState<E, W>>::from_pg(fdw_state);
let ctx = PgMemoryContexts::For(state.tmp_ctx);
let label = ctx.pstrdup("Wrappers");
let value = ctx.pstrdup(&format!("quals = {:?}", state.quals));
pg_sys::ExplainPropertyText(label, value, es);
let value = ctx.pstrdup(&format!("tgts = {:?}", state.tgts));
pg_sys::ExplainPropertyText(label, value, es);
let value = ctx.pstrdup(&format!("sorts = {:?}", state.sorts));
pg_sys::ExplainPropertyText(label, value, es);
let value = ctx.pstrdup(&format!("limit = {:?}", state.limit));
pg_sys::ExplainPropertyText(label, value, es);
if !state.aggregates.is_empty() {
let value = ctx.pstrdup(&format!("aggregates = {:?}", state.aggregates));
pg_sys::ExplainPropertyText(label, value, es);
let value = ctx.pstrdup(&format!("group_by = {:?}", state.group_by));
pg_sys::ExplainPropertyText(label, value, es);
}
}
}
unsafe fn assign_parameter_value<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
node: *mut pg_sys::ForeignScanState,
state: &mut FdwState<E, W>,
) {
unsafe {
let estate = (*node).ss.ps.state;
let econtext = (*node).ss.ps.ps_ExprContext;
for qual in &mut state.quals.iter_mut() {
if let Some(param) = &mut qual.param {
let mut current_value: Option<Value> = None;
match param.kind {
ParamKind::PARAM_EXTERN => {
let plist_info = (*estate).es_param_list_info;
if !plist_info.is_null() {
let params_cnt = (*plist_info).numParams as usize;
if param.id > 0 && param.id <= params_cnt {
let plist = (*plist_info).params.as_slice(params_cnt);
let p: pg_sys::ParamExternData = plist[param.id - 1];
if let Some(cell) =
Cell::from_polymorphic_datum(p.value, p.isnull, p.ptype)
{
qual.value = Value::Cell(cell.clone());
current_value = Some(Value::Cell(cell));
}
}
}
}
ParamKind::PARAM_EXEC => {
param.expr_eval.expr_state = pg_sys::ExecInitExpr(
param.expr_eval.expr,
node as *mut pg_sys::PlanState,
);
let mut isnull = false;
if let Some(datum) = polyfill::exec_eval_expr(
param.expr_eval.expr_state,
econtext,
&mut isnull,
) && let Some(cell) =
Cell::from_polymorphic_datum(datum, isnull, param.type_oid)
{
qual.value = Value::Cell(cell.clone());
current_value = Some(Value::Cell(cell));
}
}
_ => {}
}
let mut eval_value = param
.eval_value
.lock()
.expect("param.eval_value should be locked");
*eval_value = current_value;
}
}
}
}
fn compute_param_fingerprint<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
state: &FdwState<E, W>,
) -> String {
state
.quals
.iter()
.filter_map(|qual| {
qual.param.as_ref().map(|param| {
let eval_value = match param.eval_value.lock() {
Ok(value) => format!("{:?}", *value),
Err(_) => "lock_error".to_string(),
};
format!(
"{}|{}|{}|{}|{}|{}|{}",
qual.field,
qual.operator,
qual.use_or,
param.kind,
param.id,
param.type_oid,
eval_value,
)
})
})
.collect::<Vec<_>>()
.join(";")
}
#[pg_guard]
pub(super) extern "C-unwind" fn begin_foreign_scan<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
node: *mut pg_sys::ForeignScanState,
eflags: c_int,
) {
debug2!("---> begin_foreign_scan");
unsafe {
let scan_state = (*node).ss;
let plan = scan_state.ps.plan as *mut pg_sys::ForeignScan;
let mut state = FdwState::<E, W>::deserialize_from_list((*plan).fdw_private as _);
assert!(!state.is_null());
assign_parameter_value(node, &mut state);
state.param_fingerprint = compute_param_fingerprint(&state);
if eflags & pg_sys::EXEC_FLAG_EXPLAIN_ONLY as c_int <= 0 {
let result = if state.is_aggregate_scan() {
state.begin_aggregate_scan()
} else {
state.begin_scan()
};
if result.is_err() {
drop_fdw_state(state.as_ptr());
(*plan).fdw_private = ptr::null::<FdwState<E, W>>() as _;
result.report_unwrap();
}
let natts = if state.is_aggregate_scan() {
state.tgts.len()
} else {
let rel = scan_state.ss_currentRelation;
(*(*rel).rd_att).natts as usize
};
state
.values
.extend_from_slice(&vec![0.into_datum().unwrap(); natts]);
state.nulls.extend_from_slice(&vec![true; natts]);
}
(*node).fdw_state = state.into_pg() as _;
}
}
#[pg_guard]
pub(super) extern "C-unwind" fn iterate_foreign_scan<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
node: *mut pg_sys::ForeignScanState,
) -> *mut pg_sys::TupleTableSlot {
unsafe {
let mut state = PgBox::<FdwState<E, W>>::from_pg((*node).fdw_state as _);
assign_parameter_value(node, &mut state);
let slot = (*node).ss.ss_ScanTupleSlot;
polyfill::exec_clear_tuple(slot);
state.row.clear();
let result = state.iter_scan();
if result.is_err() {
drop_fdw_state(state.as_ptr());
(*node).fdw_state = ptr::null::<FdwState<E, W>>() as _;
}
if result.report_unwrap().is_some() {
if state.row.cols.len() != state.tgts.len() {
report_error(
PgSqlErrorCode::ERRCODE_FDW_INVALID_COLUMN_NUMBER,
"target column number not match",
);
return slot;
}
let is_agg = state.is_aggregate_scan();
PgMemoryContexts::For(state.tmp_ctx).switch_to(|_| {
for i in 0..state.row.cells.len() {
let att_idx = state.tgts[i].num - 1;
let cell = state.row.cells.get_unchecked_mut(i);
match cell.take() {
Some(cell) => {
state.values[att_idx] = cell.into_datum().unwrap();
state.nulls[att_idx] = false;
}
None => {
state.nulls[att_idx] = true;
}
}
}
if is_agg {
let desc = (*slot).tts_tupleDescriptor;
let htup = pg_sys::heap_form_tuple(
desc,
state.values.as_mut_ptr(),
state.nulls.as_mut_ptr(),
);
pg_sys::ExecStoreHeapTuple(htup, slot, true);
} else {
(*slot).tts_values = state.values.as_mut_ptr();
(*slot).tts_isnull = state.nulls.as_mut_ptr();
pg_sys::ExecStoreVirtualTuple(slot);
}
});
}
slot
}
}
#[pg_guard]
pub(super) extern "C-unwind" fn re_scan_foreign_scan<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
node: *mut pg_sys::ForeignScanState,
) {
debug2!("---> re_scan_foreign_scan");
unsafe {
let fdw_state = (*node).fdw_state as *mut FdwState<E, W>;
if !fdw_state.is_null() {
let mut state = PgBox::<FdwState<E, W>>::from_pg(fdw_state);
assign_parameter_value(node, &mut state);
let next_fingerprint = compute_param_fingerprint(&state);
let result = if next_fingerprint != state.param_fingerprint {
state.param_fingerprint = next_fingerprint;
let _ = state.end_scan();
state.begin_scan()
} else {
state.re_scan()
};
if result.is_err() {
drop_fdw_state(state.as_ptr());
(*node).fdw_state = ptr::null::<FdwState<E, W>>() as _;
result.report_unwrap();
}
}
}
}
#[pg_guard]
pub(super) extern "C-unwind" fn end_foreign_scan<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
node: *mut pg_sys::ForeignScanState,
) {
debug2!("---> end_foreign_scan");
unsafe {
let fdw_state = (*node).fdw_state as *mut FdwState<E, W>;
if fdw_state.is_null() {
return;
}
let mut state = PgBox::<FdwState<E, W>>::from_pg(fdw_state);
let result = state.end_scan();
drop_fdw_state(state.as_ptr());
(*node).fdw_state = ptr::null::<FdwState<E, W>>() as _;
result.report_unwrap();
}
}