use crate::datum::lookup_type_name;
use crate::pg_sys::{Datum, Oid};
use crate::{
heap_getattr_raw, pg_sys, AllocatedByPostgres, AllocatedByRust, FromDatum, IntoDatum, PgBox,
PgMemoryContexts, PgTupleDesc, TriggerTuple, TryFromDatumError, WhoAllocated,
};
use pgx_sql_entity_graph::metadata::{
ArgumentError, Returns, ReturnsError, SqlMapping, SqlTranslatable,
};
use std::num::NonZeroUsize;
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum PgHeapTupleError {
#[error("Incorrect attribute count, found {0}, descriptor had {1}")]
IncorrectAttributeCount(usize, usize),
#[error("The specified composite type, {0}, does not exist")]
NoSuchType(String),
}
pub struct PgHeapTuple<'a, AllocatedBy: WhoAllocated> {
tuple: PgBox<pg_sys::HeapTupleData, AllocatedBy>,
tupdesc: PgTupleDesc<'a>,
}
impl<'a> FromDatum for PgHeapTuple<'a, AllocatedByRust> {
unsafe fn from_polymorphic_datum(
composite: pg_sys::Datum,
is_null: bool,
_oid: pg_sys::Oid,
) -> Option<Self> {
if is_null {
None
} else {
Some(PgHeapTuple::from_composite_datum(composite))
}
}
unsafe fn from_datum_in_memory_context(
mut memory_context: PgMemoryContexts,
composite: Datum,
is_null: bool,
_oid: pg_sys::Oid,
) -> Option<Self>
where
Self: Sized,
{
if is_null {
None
} else {
memory_context.switch_to(|_| {
let tuple = PgHeapTuple::from_composite_datum(composite);
let datum = tuple.into_composite_datum();
Some(PgHeapTuple::from_composite_datum(datum.unwrap()))
})
}
}
}
impl<'a> PgHeapTuple<'a, AllocatedByPostgres> {
pub unsafe fn from_heap_tuple(tupdesc: PgTupleDesc<'a>, heap_tuple: pg_sys::HeapTuple) -> Self {
Self { tuple: PgBox::from_pg(heap_tuple), tupdesc }
}
pub unsafe fn from_trigger_data(
trigger_data: &'a pg_sys::TriggerData,
which_tuple: TriggerTuple,
) -> Option<PgHeapTuple<'a, AllocatedByPostgres>> {
let tupdesc =
PgTupleDesc::from_pg_unchecked(trigger_data.tg_relation.as_ref().unwrap().rd_att);
let tuple = match which_tuple {
TriggerTuple::Current => trigger_data.tg_trigtuple,
TriggerTuple::New => trigger_data.tg_newtuple,
};
if tuple.is_null() {
return None;
}
Some(PgHeapTuple::from_heap_tuple(tupdesc, tuple))
}
pub fn into_owned(self) -> PgHeapTuple<'a, AllocatedByRust> {
let copy = unsafe { pg_sys::heap_copytuple(self.tuple.into_pg()) };
PgHeapTuple {
tuple: unsafe { PgBox::<pg_sys::HeapTupleData, AllocatedByRust>::from_rust(copy) },
tupdesc: self.tupdesc,
}
}
}
impl<'a> PgHeapTuple<'a, AllocatedByRust> {
pub fn new_composite_type(
type_name: &str,
) -> Result<PgHeapTuple<'a, AllocatedByRust>, PgHeapTupleError> {
let tuple_desc = PgTupleDesc::for_composite_type(type_name)
.ok_or_else(|| PgHeapTupleError::NoSuchType(type_name.to_string()))?;
let natts = tuple_desc.len();
unsafe {
let datums =
pg_sys::palloc0(natts * std::mem::size_of::<pg_sys::Datum>()) as *mut pg_sys::Datum;
let mut is_null = (0..natts).map(|_| true).collect::<Vec<_>>();
let heap_tuple =
pg_sys::heap_form_tuple(tuple_desc.as_ptr(), datums, is_null.as_mut_ptr());
Ok(PgHeapTuple {
tuple: PgBox::<pg_sys::HeapTupleData, AllocatedByRust>::from_rust(heap_tuple),
tupdesc: tuple_desc,
})
}
}
pub fn from_datums<I: IntoIterator<Item = Option<pg_sys::Datum>>>(
tupdesc: PgTupleDesc<'a>,
datums: I,
) -> Result<PgHeapTuple<'a, AllocatedByRust>, PgHeapTupleError> {
let iter = datums.into_iter();
let mut datums = Vec::<pg_sys::Datum>::with_capacity(iter.size_hint().1.unwrap_or(1));
let mut nulls = Vec::<bool>::with_capacity(iter.size_hint().1.unwrap_or(1));
iter.for_each(|datum| {
nulls.push(datum.is_none());
datums.push(datum.unwrap_or(0.into()));
});
if datums.len() != tupdesc.len() {
return Err(PgHeapTupleError::IncorrectAttributeCount(datums.len(), tupdesc.len()));
}
unsafe {
let formed_tuple =
pg_sys::heap_form_tuple(tupdesc.as_ptr(), datums.as_mut_ptr(), nulls.as_mut_ptr());
Ok(Self {
tuple: PgBox::<pg_sys::HeapTupleData, AllocatedByRust>::from_rust(formed_tuple),
tupdesc,
})
}
}
pub unsafe fn from_composite_datum(composite: pg_sys::Datum) -> Self {
let htup_header =
pg_sys::pg_detoast_datum(composite.cast_mut_ptr()) as pg_sys::HeapTupleHeader;
let tup_type = crate::heap_tuple_header_get_type_id(htup_header);
let tup_typmod = crate::heap_tuple_header_get_typmod(htup_header);
let tupdesc = pg_sys::lookup_rowtype_tupdesc(tup_type, tup_typmod);
let mut data = PgBox::<pg_sys::HeapTupleData>::alloc0();
data.t_len = crate::heap_tuple_header_get_datum_length(htup_header) as u32;
data.t_data = htup_header;
Self { tuple: data, tupdesc: PgTupleDesc::from_pg(tupdesc) }
}
pub fn set_by_name<T: IntoDatum>(
&mut self,
attname: &str,
value: T,
) -> Result<(), TryFromDatumError> {
match self.get_attribute_by_name(attname) {
None => Err(TryFromDatumError::NoSuchAttributeName(attname.to_string())),
Some((attnum, _)) => self.set_by_index(attnum, value),
}
}
pub fn set_by_index<T: IntoDatum>(
&mut self,
attno: NonZeroUsize,
value: T,
) -> Result<(), TryFromDatumError> {
unsafe {
match self.get_attribute_by_index(attno) {
None => return Err(TryFromDatumError::NoSuchAttributeNumber(attno)),
Some(att) => {
let type_oid = T::type_oid();
let composite_type_oid = value.composite_type_oid();
let is_compatible_composite_types =
type_oid == pg_sys::RECORDOID && composite_type_oid == Some(att.atttypid);
if !is_compatible_composite_types && !T::is_compatible_with(att.atttypid) {
return Err(TryFromDatumError::IncompatibleTypes {
rust_type: std::any::type_name::<T>(),
rust_oid: att.atttypid,
datum_type: lookup_type_name(type_oid),
datum_oid: type_oid,
});
}
}
}
let mut datums =
(0..self.tupdesc.len()).map(|i| pg_sys::Datum::from(i)).collect::<Vec<_>>();
let mut nulls = (0..self.tupdesc.len()).map(|_| false).collect::<Vec<_>>();
let mut do_replace = (0..self.tupdesc.len()).map(|_| false).collect::<Vec<_>>();
let datum = value.into_datum();
let attno = attno.get() - 1;
nulls[attno] = datum.is_none();
datums[attno] = datum.unwrap_or(0.into());
do_replace[attno] = true;
let new_tuple = PgBox::<pg_sys::HeapTupleData, AllocatedByRust>::from_rust(
pg_sys::heap_modify_tuple(
self.tuple.as_ptr(),
self.tupdesc.as_ptr(),
datums.as_mut_ptr(),
nulls.as_mut_ptr(),
do_replace.as_mut_ptr(),
),
);
let old_tuple = std::mem::replace(&mut self.tuple, new_tuple);
drop(old_tuple);
Ok(())
}
}
}
impl<'a, AllocatedBy: WhoAllocated> IntoDatum for PgHeapTuple<'a, AllocatedBy> {
fn into_datum(self) -> Option<pg_sys::Datum> {
self.into_composite_datum()
}
fn type_oid() -> pg_sys::Oid {
crate::pg_sys::RECORDOID
}
fn composite_type_oid(&self) -> Option<Oid> {
Some(self.tupdesc.oid())
}
fn is_compatible_with(other: pg_sys::Oid) -> bool {
fn is_composite(oid: pg_sys::Oid) -> bool {
unsafe {
let entry = pg_sys::lookup_type_cache(oid, pg_sys::TYPECACHE_TUPDESC as _);
(*entry).typtype as i8 == pg_sys::RELKIND_COMPOSITE_TYPE as i8
}
}
Self::type_oid() == other || is_composite(other)
}
}
impl<'a, AllocatedBy: WhoAllocated> PgHeapTuple<'a, AllocatedBy> {
pub fn into_composite_datum(self) -> Option<pg_sys::Datum> {
unsafe {
Some(pg_sys::heap_copy_tuple_as_datum(self.tuple.as_ptr(), self.tupdesc.as_ptr()))
}
}
pub fn into_trigger_datum(self) -> Option<pg_sys::Datum> {
self.tuple.into_datum()
}
#[inline]
pub fn into_pg(self) -> *mut pg_sys::HeapTupleData {
self.tuple.into_pg()
}
#[inline]
pub fn len(&self) -> usize {
self.tupdesc.len()
}
pub fn attributes(
&'a self,
) -> impl std::iter::Iterator<Item = (NonZeroUsize, &'a pg_sys::FormData_pg_attribute)> {
self.tupdesc.iter().enumerate().map(|(i, att)| (NonZeroUsize::new(i + 1).unwrap(), att))
}
#[inline]
pub fn get_attribute_by_index(
&'a self,
index: NonZeroUsize,
) -> Option<&'a pg_sys::FormData_pg_attribute> {
self.tupdesc.get(index.get() - 1)
}
pub fn get_attribute_by_name(
&'a self,
name: &str,
) -> Option<(NonZeroUsize, &'a pg_sys::FormData_pg_attribute)> {
for i in 0..self.len() {
let i = NonZeroUsize::new(i + 1).unwrap();
let att = self.get_attribute_by_index(i).unwrap();
if att.name() == name {
return Some((i, att));
}
}
None
}
pub fn get_by_name<T: FromDatum + IntoDatum + 'static>(
&self,
attname: &str,
) -> Result<Option<T>, TryFromDatumError> {
for att in self.tupdesc.iter() {
if att.name() == attname {
return self.get_by_index(NonZeroUsize::new(att.attnum as usize).unwrap());
}
}
Err(TryFromDatumError::NoSuchAttributeName(attname.to_owned()))
}
pub fn get_by_index<T: FromDatum + IntoDatum + 'static>(
&self,
attno: NonZeroUsize,
) -> Result<Option<T>, TryFromDatumError> {
unsafe {
match self.tupdesc.get(attno.get() - 1) {
None => Err(TryFromDatumError::NoSuchAttributeNumber(attno)),
Some(att) => {
let datum = heap_getattr_raw(self.tuple.as_ptr(), attno, self.tupdesc.as_ptr());
if datum.is_none() {
return Ok(None);
}
match T::type_oid() {
record @ pg_sys::RECORDOID => {
T::try_from_datum(datum.unwrap(), false, record)
}
_ => T::try_from_datum(datum.unwrap(), false, att.type_oid().value()),
}
}
}
}
}
}
#[macro_export]
macro_rules! composite_type {
($lt:lifetime, $composite_type:expr) => {
::pgx::heap_tuple::PgHeapTuple<$lt, ::pgx::AllocatedByRust>
};
($composite_type:expr) => {
::pgx::heap_tuple::PgHeapTuple<'static, ::pgx::AllocatedByRust>
};
}
unsafe impl SqlTranslatable for crate::heap_tuple::PgHeapTuple<'static, AllocatedByPostgres> {
fn argument_sql() -> Result<SqlMapping, ArgumentError> {
Ok(SqlMapping::Composite { array_brackets: false })
}
fn return_sql() -> Result<Returns, ReturnsError> {
Ok(Returns::One(SqlMapping::Composite { array_brackets: false }))
}
}
unsafe impl SqlTranslatable for crate::heap_tuple::PgHeapTuple<'static, AllocatedByRust> {
fn argument_sql() -> Result<SqlMapping, ArgumentError> {
Ok(SqlMapping::Composite { array_brackets: false })
}
fn return_sql() -> Result<Returns, ReturnsError> {
Ok(Returns::One(SqlMapping::Composite { array_brackets: false }))
}
}