use crate::FdwRoutine;
use crate::instance::ForeignServer;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::{Date, Interval, Time, Timestamp, TimestampWithTimeZone};
use pgrx::{
AllocatedByRust, AnyNumeric, FromDatum, IntoDatum, JsonB, PgBuiltInOids, PgOid,
datum::Uuid,
fcinfo,
pg_sys::{self, BuiltinOid, Datum, Expr, ExprState, Oid, bytea},
};
use std::collections::HashMap;
use std::ffi::CStr;
use std::fmt;
use std::iter::Zip;
use std::mem;
use std::slice::Iter;
use std::sync::{Arc, Mutex};
pub const FOREIGN_DATA_WRAPPER_RELATION_ID: Oid = BuiltinOid::ForeignDataWrapperRelationId.value();
pub const FOREIGN_SERVER_RELATION_ID: Oid = BuiltinOid::ForeignServerRelationId.value();
pub const FOREIGN_TABLE_RELATION_ID: Oid = BuiltinOid::ForeignTableRelationId.value();
#[derive(Debug)]
pub enum Cell {
Bool(bool),
I8(i8),
I16(i16),
F32(f32),
I32(i32),
F64(f64),
I64(i64),
Numeric(AnyNumeric),
String(String),
Date(Date),
Time(Time),
Timestamp(Timestamp),
Timestamptz(TimestampWithTimeZone),
Interval(Interval),
Json(JsonB),
Bytea(*mut bytea),
Uuid(Uuid),
BoolArray(Vec<Option<bool>>),
I16Array(Vec<Option<i16>>),
I32Array(Vec<Option<i32>>),
I64Array(Vec<Option<i64>>),
F32Array(Vec<Option<f32>>),
F64Array(Vec<Option<f64>>),
StringArray(Vec<Option<String>>),
}
impl Cell {
pub fn is_array(&self) -> bool {
matches!(
self,
Cell::BoolArray(_)
| Cell::I16Array(_)
| Cell::I32Array(_)
| Cell::I64Array(_)
| Cell::F32Array(_)
| Cell::F64Array(_)
| Cell::StringArray(_)
)
}
}
unsafe impl Send for Cell {}
impl Clone for Cell {
fn clone(&self) -> Self {
match self {
Cell::Bool(v) => Cell::Bool(*v),
Cell::I8(v) => Cell::I8(*v),
Cell::I16(v) => Cell::I16(*v),
Cell::F32(v) => Cell::F32(*v),
Cell::I32(v) => Cell::I32(*v),
Cell::F64(v) => Cell::F64(*v),
Cell::I64(v) => Cell::I64(*v),
Cell::Numeric(v) => Cell::Numeric(v.clone()),
Cell::String(v) => Cell::String(v.clone()),
Cell::Date(v) => Cell::Date(*v),
Cell::Time(v) => Cell::Time(*v),
Cell::Timestamp(v) => Cell::Timestamp(*v),
Cell::Timestamptz(v) => Cell::Timestamptz(*v),
Cell::Interval(v) => Cell::Interval(*v),
Cell::Json(v) => Cell::Json(JsonB(v.0.clone())),
Cell::Bytea(v) => Cell::Bytea(*v),
Cell::Uuid(v) => Cell::Uuid(*v),
Cell::BoolArray(v) => Cell::BoolArray(v.clone()),
Cell::I16Array(v) => Cell::I16Array(v.clone()),
Cell::I32Array(v) => Cell::I32Array(v.clone()),
Cell::I64Array(v) => Cell::I64Array(v.clone()),
Cell::F32Array(v) => Cell::F32Array(v.clone()),
Cell::F64Array(v) => Cell::F64Array(v.clone()),
Cell::StringArray(v) => Cell::StringArray(v.clone()),
}
}
}
fn write_array<T: std::fmt::Display>(
array: &[Option<T>],
f: &mut fmt::Formatter<'_>,
) -> fmt::Result {
let res = array
.iter()
.map(|e| match e {
Some(val) => format!("{val}",),
None => "null".to_owned(),
})
.collect::<Vec<String>>()
.join(",");
write!(f, "[{res}]",)
}
impl fmt::Display for Cell {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Cell::Bool(v) => write!(f, "{v}"),
Cell::I8(v) => write!(f, "{v}"),
Cell::I16(v) => write!(f, "{v}"),
Cell::F32(v) => write!(f, "{v}"),
Cell::I32(v) => write!(f, "{v}"),
Cell::F64(v) => write!(f, "{v}"),
Cell::I64(v) => write!(f, "{v}"),
Cell::Numeric(v) => write!(f, "{v}"),
Cell::String(v) => write!(f, "'{v}'"),
Cell::Date(v) => unsafe {
let dt =
fcinfo::direct_function_call_as_datum(pg_sys::date_out, &[(*v).into_datum()])
.expect("cell should be a valid date");
let dt_cstr = CStr::from_ptr(dt.cast_mut_ptr());
write!(
f,
"'{}'",
dt_cstr.to_str().expect("date should be a valid string")
)
},
Cell::Time(v) => unsafe {
let ts =
fcinfo::direct_function_call_as_datum(pg_sys::time_out, &[(*v).into_datum()])
.expect("cell should be a valid time");
let ts_cstr = CStr::from_ptr(ts.cast_mut_ptr());
write!(
f,
"'{}'",
ts_cstr.to_str().expect("time hould be a valid string")
)
},
Cell::Timestamp(v) => unsafe {
let ts = fcinfo::direct_function_call_as_datum(
pg_sys::timestamp_out,
&[(*v).into_datum()],
)
.expect("cell should be a valid timestamp");
let ts_cstr = CStr::from_ptr(ts.cast_mut_ptr());
write!(
f,
"'{}'",
ts_cstr
.to_str()
.expect("timestamp should be a valid string")
)
},
Cell::Timestamptz(v) => unsafe {
let ts = fcinfo::direct_function_call_as_datum(
pg_sys::timestamptz_out,
&[(*v).into_datum()],
)
.expect("cell should be a valid timestamptz");
let ts_cstr = CStr::from_ptr(ts.cast_mut_ptr());
write!(
f,
"'{}'",
ts_cstr
.to_str()
.expect("timestamptz should be a valid string")
)
},
Cell::Interval(v) => write!(f, "{v}"),
Cell::Json(v) => write!(f, "{v:?}"),
Cell::Bytea(v) => {
let byte_u8 = unsafe { pgrx::varlena::varlena_to_byte_slice(*v) };
let hex = byte_u8
.iter()
.map(|b| format!("{b:02X}"))
.collect::<Vec<String>>()
.join("");
if hex.is_empty() {
write!(f, "''")
} else {
write!(f, r#"'\x{hex}'"#,)
}
}
Cell::Uuid(v) => write!(f, "'{v}'",),
Cell::BoolArray(v) => write_array(v, f),
Cell::I16Array(v) => write_array(v, f),
Cell::I32Array(v) => write_array(v, f),
Cell::I64Array(v) => write_array(v, f),
Cell::F32Array(v) => write_array(v, f),
Cell::F64Array(v) => write_array(v, f),
Cell::StringArray(v) => write_array(v, f),
}
}
}
impl IntoDatum for Cell {
fn into_datum(self) -> Option<Datum> {
match self {
Cell::Bool(v) => v.into_datum(),
Cell::I8(v) => v.into_datum(),
Cell::I16(v) => v.into_datum(),
Cell::F32(v) => v.into_datum(),
Cell::I32(v) => v.into_datum(),
Cell::F64(v) => v.into_datum(),
Cell::I64(v) => v.into_datum(),
Cell::Numeric(v) => v.into_datum(),
Cell::String(v) => v.into_datum(),
Cell::Date(v) => v.into_datum(),
Cell::Time(v) => v.into_datum(),
Cell::Timestamp(v) => v.into_datum(),
Cell::Timestamptz(v) => v.into_datum(),
Cell::Interval(v) => v.into_datum(),
Cell::Json(v) => v.into_datum(),
Cell::Bytea(v) => Some(Datum::from(v)),
Cell::Uuid(v) => v.into_datum(),
Cell::BoolArray(v) => v.into_datum(),
Cell::I16Array(v) => v.into_datum(),
Cell::I32Array(v) => v.into_datum(),
Cell::I64Array(v) => v.into_datum(),
Cell::F32Array(v) => v.into_datum(),
Cell::F64Array(v) => v.into_datum(),
Cell::StringArray(v) => v.into_datum(),
}
}
fn type_oid() -> Oid {
Oid::INVALID
}
fn is_compatible_with(other: Oid) -> bool {
Self::type_oid() == other
|| other == pg_sys::BOOLOID
|| other == pg_sys::CHAROID
|| other == pg_sys::INT2OID
|| other == pg_sys::FLOAT4OID
|| other == pg_sys::INT4OID
|| other == pg_sys::FLOAT8OID
|| other == pg_sys::INT8OID
|| other == pg_sys::NUMERICOID
|| other == pg_sys::TEXTOID
|| other == pg_sys::DATEOID
|| other == pg_sys::TIMEOID
|| other == pg_sys::TIMESTAMPOID
|| other == pg_sys::TIMESTAMPTZOID
|| other == pg_sys::INTERVALOID
|| other == pg_sys::JSONBOID
|| other == pg_sys::BYTEAOID
|| other == pg_sys::UUIDOID
|| other == pg_sys::BOOLARRAYOID
|| other == pg_sys::INT2ARRAYOID
|| other == pg_sys::INT4ARRAYOID
|| other == pg_sys::INT8ARRAYOID
|| other == pg_sys::FLOAT4ARRAYOID
|| other == pg_sys::FLOAT8ARRAYOID
|| other == pg_sys::TEXTARRAYOID
}
}
impl FromDatum for Cell {
unsafe fn from_polymorphic_datum(datum: Datum, is_null: bool, typoid: Oid) -> Option<Self>
where
Self: Sized,
{
unsafe {
let oid = PgOid::from(typoid);
match oid {
PgOid::BuiltIn(PgBuiltInOids::BOOLOID) => {
bool::from_datum(datum, is_null).map(Cell::Bool)
}
PgOid::BuiltIn(PgBuiltInOids::CHAROID) => {
i8::from_datum(datum, is_null).map(Cell::I8)
}
PgOid::BuiltIn(PgBuiltInOids::INT2OID) => {
i16::from_datum(datum, is_null).map(Cell::I16)
}
PgOid::BuiltIn(PgBuiltInOids::FLOAT4OID) => {
f32::from_datum(datum, is_null).map(Cell::F32)
}
PgOid::BuiltIn(PgBuiltInOids::INT4OID) => {
i32::from_datum(datum, is_null).map(Cell::I32)
}
PgOid::BuiltIn(PgBuiltInOids::FLOAT8OID) => {
f64::from_datum(datum, is_null).map(Cell::F64)
}
PgOid::BuiltIn(PgBuiltInOids::INT8OID) => {
i64::from_datum(datum, is_null).map(Cell::I64)
}
PgOid::BuiltIn(PgBuiltInOids::NUMERICOID) => {
AnyNumeric::from_datum(datum, is_null).map(Cell::Numeric)
}
PgOid::BuiltIn(PgBuiltInOids::TEXTOID) => {
String::from_datum(datum, is_null).map(Cell::String)
}
PgOid::BuiltIn(PgBuiltInOids::DATEOID) => {
Date::from_datum(datum, is_null).map(Cell::Date)
}
PgOid::BuiltIn(PgBuiltInOids::TIMEOID) => {
Time::from_datum(datum, is_null).map(Cell::Time)
}
PgOid::BuiltIn(PgBuiltInOids::TIMESTAMPOID) => {
Timestamp::from_datum(datum, is_null).map(Cell::Timestamp)
}
PgOid::BuiltIn(PgBuiltInOids::TIMESTAMPTZOID) => {
TimestampWithTimeZone::from_datum(datum, is_null).map(Cell::Timestamptz)
}
PgOid::BuiltIn(PgBuiltInOids::INTERVALOID) => {
Interval::from_datum(datum, is_null).map(Cell::Interval)
}
PgOid::BuiltIn(PgBuiltInOids::JSONBOID) => {
JsonB::from_datum(datum, is_null).map(Cell::Json)
}
PgOid::BuiltIn(PgBuiltInOids::BYTEAOID) => {
if is_null {
None
} else {
Some(Cell::Bytea(datum.cast_mut_ptr::<bytea>()))
}
}
PgOid::BuiltIn(PgBuiltInOids::UUIDOID) => {
Uuid::from_datum(datum, is_null).map(Cell::Uuid)
}
PgOid::BuiltIn(PgBuiltInOids::BOOLARRAYOID) => {
Vec::<Option<bool>>::from_datum(datum, false).map(Cell::BoolArray)
}
PgOid::BuiltIn(PgBuiltInOids::INT2ARRAYOID) => {
Vec::<Option<i16>>::from_datum(datum, false).map(Cell::I16Array)
}
PgOid::BuiltIn(PgBuiltInOids::INT4ARRAYOID) => {
Vec::<Option<i32>>::from_datum(datum, false).map(Cell::I32Array)
}
PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID) => {
Vec::<Option<i64>>::from_datum(datum, false).map(Cell::I64Array)
}
PgOid::BuiltIn(PgBuiltInOids::FLOAT4ARRAYOID) => {
Vec::<Option<f32>>::from_datum(datum, false).map(Cell::F32Array)
}
PgOid::BuiltIn(PgBuiltInOids::FLOAT8ARRAYOID) => {
Vec::<Option<f64>>::from_datum(datum, false).map(Cell::F64Array)
}
PgOid::BuiltIn(PgBuiltInOids::TEXTARRAYOID) => {
Vec::<Option<String>>::from_datum(datum, false).map(Cell::StringArray)
}
PgOid::Custom(_) => {
if is_null {
None
} else {
Some(Cell::Bytea(datum.cast_mut_ptr::<bytea>()))
}
}
_ => None,
}
}
}
}
pub trait CellFormatter {
fn fmt_cell(&mut self, cell: &Cell) -> String;
}
struct DefaultFormatter {}
impl DefaultFormatter {
fn new() -> Self {
Self {}
}
}
impl CellFormatter for DefaultFormatter {
fn fmt_cell(&mut self, cell: &Cell) -> String {
format!("{cell}",)
}
}
#[derive(Debug, Clone, Default)]
pub struct Row {
pub cols: Vec<String>,
pub cells: Vec<Option<Cell>>,
}
impl Row {
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, col: &str, cell: Option<Cell>) {
self.cols.push(col.to_owned());
self.cells.push(cell);
}
pub fn iter(&self) -> Zip<Iter<'_, String>, Iter<'_, Option<Cell>>> {
self.cols.iter().zip(self.cells.iter())
}
pub fn retain<F>(&mut self, f: F)
where
F: FnMut((&String, &Option<Cell>)) -> bool,
{
let keep: Vec<bool> = self.iter().map(f).collect();
let mut iter = keep.iter();
self.cols.retain(|_| *iter.next().unwrap_or(&true));
iter = keep.iter();
self.cells.retain(|_| *iter.next().unwrap_or(&true));
}
#[inline]
pub fn replace_with(&mut self, src: Row) {
let _ = mem::replace(self, src);
}
pub fn clear(&mut self) {
self.cols.clear();
self.cells.clear();
}
}
#[derive(Debug, Clone, Default)]
pub struct Column {
pub name: String,
pub num: usize,
pub type_oid: Oid,
}
#[derive(Debug, Clone)]
pub enum Value {
Cell(Cell),
Array(Vec<Cell>),
}
#[derive(Debug, Clone)]
pub(super) struct ExprEval {
pub(super) expr: *mut Expr,
pub(super) expr_state: *mut ExprState,
}
unsafe impl Send for ExprEval {}
#[derive(Debug, Clone)]
pub struct Param {
pub kind: pg_sys::ParamKind::Type,
pub id: usize,
pub type_oid: Oid,
pub eval_value: Arc<Mutex<Option<Value>>>,
pub(super) expr_eval: ExprEval,
}
#[derive(Debug, Clone)]
pub struct Qual {
pub field: String,
pub operator: String,
pub value: Value,
pub use_or: bool,
pub param: Option<Param>,
}
impl Qual {
pub fn deparse(&self) -> String {
let mut formatter = DefaultFormatter::new();
self.deparse_with_fmt(&mut formatter)
}
pub fn deparse_with_fmt<T: CellFormatter>(&self, t: &mut T) -> String {
if self.use_or {
match &self.value {
Value::Cell(_) => unreachable!(),
Value::Array(cells) => {
let conds: Vec<String> = cells
.iter()
.map(|cell| {
format!("{} {} {}", self.field, self.operator, t.fmt_cell(cell))
})
.collect();
conds.join(" or ")
}
}
} else {
match &self.value {
Value::Cell(cell) => match self.operator.as_str() {
"is" | "is not" => match cell {
Cell::String(cell) if cell == "null" => {
format!("{} {} null", self.field, self.operator)
}
_ => format!("{} {} {}", self.field, self.operator, t.fmt_cell(cell)),
},
"~~" => format!("{} like {}", self.field, t.fmt_cell(cell)),
"!~~" => format!("{} not like {}", self.field, t.fmt_cell(cell)),
_ => format!("{} {} {}", self.field, self.operator, t.fmt_cell(cell)),
},
Value::Array(_) => unreachable!(),
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct Sort {
pub field: String,
pub field_no: usize,
pub reversed: bool,
pub nulls_first: bool,
pub collate: Option<String>,
}
impl Sort {
pub fn deparse(&self) -> String {
let mut sql = self.field.to_string();
if self.reversed {
sql.push_str(" desc");
} else {
sql.push_str(" asc");
}
if self.nulls_first {
sql.push_str(" nulls first")
} else {
sql.push_str(" nulls last")
}
sql
}
pub fn deparse_with_collate(&self) -> String {
let mut sql = self.deparse();
if let Some(collate) = &self.collate {
sql.push_str(&format!(" collate {collate}"));
}
sql
}
}
#[derive(Debug, Clone, Default)]
pub struct Limit {
pub count: i64,
pub offset: i64,
}
impl Limit {
pub fn deparse(&self) -> String {
format!("limit {} offset {}", self.count, self.offset)
}
}
pub trait ForeignDataWrapper<E: Into<ErrorReport>> {
fn new(server: ForeignServer) -> Result<Self, E>
where
Self: Sized;
fn get_rel_size(
&mut self,
_quals: &[Qual],
_columns: &[Column],
_sorts: &[Sort],
_limit: &Option<Limit>,
_options: &HashMap<String, String>,
) -> Result<(i64, i32), E> {
Ok((0, 0))
}
fn begin_scan(
&mut self,
quals: &[Qual],
columns: &[Column],
sorts: &[Sort],
limit: &Option<Limit>,
options: &HashMap<String, String>,
) -> Result<(), E>;
fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, E>;
fn re_scan(&mut self) -> Result<(), E> {
Ok(())
}
fn end_scan(&mut self) -> Result<(), E>;
fn begin_modify(&mut self, _options: &HashMap<String, String>) -> Result<(), E> {
Ok(())
}
fn insert(&mut self, _row: &Row) -> Result<(), E> {
Ok(())
}
fn update(&mut self, _rowid: &Cell, _new_row: &Row) -> Result<(), E> {
Ok(())
}
fn delete(&mut self, _rowid: &Cell) -> Result<(), E> {
Ok(())
}
fn end_modify(&mut self) -> Result<(), E> {
Ok(())
}
fn import_foreign_schema(
&mut self,
_stmt: crate::import_foreign_schema::ImportForeignSchemaStmt,
) -> Result<Vec<String>, E> {
Ok(Vec::new())
}
fn fdw_routine() -> FdwRoutine
where
Self: Sized,
{
unsafe {
use crate::{import_foreign_schema, modify, scan};
let mut fdw_routine =
FdwRoutine::<AllocatedByRust>::alloc_node(pg_sys::NodeTag::T_FdwRoutine);
fdw_routine.ImportForeignSchema =
Some(import_foreign_schema::import_foreign_schema::<E, Self>);
fdw_routine.GetForeignRelSize = Some(scan::get_foreign_rel_size::<E, Self>);
fdw_routine.GetForeignPaths = Some(scan::get_foreign_paths::<E, Self>);
fdw_routine.GetForeignPlan = Some(scan::get_foreign_plan::<E, Self>);
fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::<E, Self>);
fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::<E, Self>);
fdw_routine.IterateForeignScan = Some(scan::iterate_foreign_scan::<E, Self>);
fdw_routine.ReScanForeignScan = Some(scan::re_scan_foreign_scan::<E, Self>);
fdw_routine.EndForeignScan = Some(scan::end_foreign_scan::<E, Self>);
fdw_routine.AddForeignUpdateTargets = Some(modify::add_foreign_update_targets);
fdw_routine.PlanForeignModify = Some(modify::plan_foreign_modify::<E, Self>);
fdw_routine.BeginForeignModify = Some(modify::begin_foreign_modify::<E, Self>);
fdw_routine.ExecForeignInsert = Some(modify::exec_foreign_insert::<E, Self>);
fdw_routine.ExecForeignDelete = Some(modify::exec_foreign_delete::<E, Self>);
fdw_routine.ExecForeignUpdate = Some(modify::exec_foreign_update::<E, Self>);
fdw_routine.EndForeignModify = Some(modify::end_foreign_modify::<E, Self>);
Self::fdw_routine_hook(&mut fdw_routine);
fdw_routine.into_pg_boxed()
}
}
fn fdw_routine_hook(_routine: &mut FdwRoutine<AllocatedByRust>) {}
fn validator(_options: Vec<Option<String>>, _catalog: Option<Oid>) -> Result<(), E> {
Ok(())
}
}