1use crate::instance::ForeignServer;
5use crate::FdwRoutine;
6use pgrx::pg_sys::panic::ErrorReport;
7use pgrx::prelude::{Date, Interval, Time, Timestamp, TimestampWithTimeZone};
8use pgrx::{
9 datum::Uuid,
10 fcinfo,
11 pg_sys::{self, bytea, BuiltinOid, Datum, Oid},
12 AllocatedByRust, AnyNumeric, FromDatum, IntoDatum, JsonB, PgBuiltInOids, PgOid,
13};
14use std::collections::HashMap;
15use std::ffi::CStr;
16use std::fmt;
17use std::iter::Zip;
18use std::mem;
19use std::slice::Iter;
20
21pub const FOREIGN_DATA_WRAPPER_RELATION_ID: Oid = BuiltinOid::ForeignDataWrapperRelationId.value();
28
29pub const FOREIGN_SERVER_RELATION_ID: Oid = BuiltinOid::ForeignServerRelationId.value();
31
32pub const FOREIGN_TABLE_RELATION_ID: Oid = BuiltinOid::ForeignTableRelationId.value();
34
35#[derive(Debug)]
37pub enum Cell {
38 Bool(bool),
39 I8(i8),
40 I16(i16),
41 F32(f32),
42 I32(i32),
43 F64(f64),
44 I64(i64),
45 Numeric(AnyNumeric),
46 String(String),
47 Date(Date),
48 Time(Time),
49 Timestamp(Timestamp),
50 Timestamptz(TimestampWithTimeZone),
51 Interval(Interval),
52 Json(JsonB),
53 Bytea(*mut bytea),
54 Uuid(Uuid),
55 BoolArray(Vec<Option<bool>>),
56 I16Array(Vec<Option<i16>>),
57 I32Array(Vec<Option<i32>>),
58 I64Array(Vec<Option<i64>>),
59 F32Array(Vec<Option<f32>>),
60 F64Array(Vec<Option<f64>>),
61 StringArray(Vec<Option<String>>),
62}
63
64impl Cell {
65 pub fn is_array(&self) -> bool {
67 matches!(
68 self,
69 Cell::BoolArray(_)
70 | Cell::I16Array(_)
71 | Cell::I32Array(_)
72 | Cell::I64Array(_)
73 | Cell::F32Array(_)
74 | Cell::F64Array(_)
75 | Cell::StringArray(_)
76 )
77 }
78}
79
80unsafe impl Send for Cell {}
81
82impl Clone for Cell {
83 fn clone(&self) -> Self {
84 match self {
85 Cell::Bool(v) => Cell::Bool(*v),
86 Cell::I8(v) => Cell::I8(*v),
87 Cell::I16(v) => Cell::I16(*v),
88 Cell::F32(v) => Cell::F32(*v),
89 Cell::I32(v) => Cell::I32(*v),
90 Cell::F64(v) => Cell::F64(*v),
91 Cell::I64(v) => Cell::I64(*v),
92 Cell::Numeric(v) => Cell::Numeric(v.clone()),
93 Cell::String(v) => Cell::String(v.clone()),
94 Cell::Date(v) => Cell::Date(*v),
95 Cell::Time(v) => Cell::Time(*v),
96 Cell::Timestamp(v) => Cell::Timestamp(*v),
97 Cell::Timestamptz(v) => Cell::Timestamptz(*v),
98 Cell::Interval(v) => Cell::Interval(*v),
99 Cell::Json(v) => Cell::Json(JsonB(v.0.clone())),
100 Cell::Bytea(v) => Cell::Bytea(*v),
101 Cell::Uuid(v) => Cell::Uuid(*v),
102 Cell::BoolArray(v) => Cell::BoolArray(v.clone()),
103 Cell::I16Array(v) => Cell::I16Array(v.clone()),
104 Cell::I32Array(v) => Cell::I32Array(v.clone()),
105 Cell::I64Array(v) => Cell::I64Array(v.clone()),
106 Cell::F32Array(v) => Cell::F32Array(v.clone()),
107 Cell::F64Array(v) => Cell::F64Array(v.clone()),
108 Cell::StringArray(v) => Cell::StringArray(v.clone()),
109 }
110 }
111}
112
113fn write_array<T: std::fmt::Display>(
114 array: &[Option<T>],
115 f: &mut fmt::Formatter<'_>,
116) -> fmt::Result {
117 let res = array
118 .iter()
119 .map(|e| match e {
120 Some(val) => format!("{}", val),
121 None => "null".to_owned(),
122 })
123 .collect::<Vec<String>>()
124 .join(",");
125 write!(f, "[{}]", res)
126}
127
128impl fmt::Display for Cell {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 match self {
131 Cell::Bool(v) => write!(f, "{}", v),
132 Cell::I8(v) => write!(f, "{}", v),
133 Cell::I16(v) => write!(f, "{}", v),
134 Cell::F32(v) => write!(f, "{}", v),
135 Cell::I32(v) => write!(f, "{}", v),
136 Cell::F64(v) => write!(f, "{}", v),
137 Cell::I64(v) => write!(f, "{}", v),
138 Cell::Numeric(v) => write!(f, "{}", v),
139 Cell::String(v) => write!(f, "'{}'", v),
140 Cell::Date(v) => unsafe {
141 let dt =
142 fcinfo::direct_function_call_as_datum(pg_sys::date_out, &[(*v).into_datum()])
143 .expect("cell should be a valid date");
144 let dt_cstr = CStr::from_ptr(dt.cast_mut_ptr());
145 write!(
146 f,
147 "'{}'",
148 dt_cstr.to_str().expect("date should be a valid string")
149 )
150 },
151 Cell::Time(v) => unsafe {
152 let ts =
153 fcinfo::direct_function_call_as_datum(pg_sys::time_out, &[(*v).into_datum()])
154 .expect("cell should be a valid time");
155 let ts_cstr = CStr::from_ptr(ts.cast_mut_ptr());
156 write!(
157 f,
158 "'{}'",
159 ts_cstr.to_str().expect("time hould be a valid string")
160 )
161 },
162 Cell::Timestamp(v) => unsafe {
163 let ts = fcinfo::direct_function_call_as_datum(
164 pg_sys::timestamp_out,
165 &[(*v).into_datum()],
166 )
167 .expect("cell should be a valid timestamp");
168 let ts_cstr = CStr::from_ptr(ts.cast_mut_ptr());
169 write!(
170 f,
171 "'{}'",
172 ts_cstr
173 .to_str()
174 .expect("timestamp should be a valid string")
175 )
176 },
177 Cell::Timestamptz(v) => unsafe {
178 let ts = fcinfo::direct_function_call_as_datum(
179 pg_sys::timestamptz_out,
180 &[(*v).into_datum()],
181 )
182 .expect("cell should be a valid timestamptz");
183 let ts_cstr = CStr::from_ptr(ts.cast_mut_ptr());
184 write!(
185 f,
186 "'{}'",
187 ts_cstr
188 .to_str()
189 .expect("timestamptz should be a valid string")
190 )
191 },
192 Cell::Interval(v) => write!(f, "{}", v),
193 Cell::Json(v) => write!(f, "{:?}", v),
194 Cell::Bytea(v) => {
195 let byte_u8 = unsafe { pgrx::varlena::varlena_to_byte_slice(*v) };
196 let hex = byte_u8
197 .iter()
198 .map(|b| format!("{:02X}", b))
199 .collect::<Vec<String>>()
200 .join("");
201 if hex.is_empty() {
202 write!(f, "''")
203 } else {
204 write!(f, r#"'\x{}'"#, hex)
205 }
206 }
207 Cell::Uuid(v) => write!(f, "'{}'", v),
208 Cell::BoolArray(v) => write_array(v, f),
209 Cell::I16Array(v) => write_array(v, f),
210 Cell::I32Array(v) => write_array(v, f),
211 Cell::I64Array(v) => write_array(v, f),
212 Cell::F32Array(v) => write_array(v, f),
213 Cell::F64Array(v) => write_array(v, f),
214 Cell::StringArray(v) => write_array(v, f),
215 }
216 }
217}
218
219impl IntoDatum for Cell {
220 fn into_datum(self) -> Option<Datum> {
221 match self {
222 Cell::Bool(v) => v.into_datum(),
223 Cell::I8(v) => v.into_datum(),
224 Cell::I16(v) => v.into_datum(),
225 Cell::F32(v) => v.into_datum(),
226 Cell::I32(v) => v.into_datum(),
227 Cell::F64(v) => v.into_datum(),
228 Cell::I64(v) => v.into_datum(),
229 Cell::Numeric(v) => v.into_datum(),
230 Cell::String(v) => v.into_datum(),
231 Cell::Date(v) => v.into_datum(),
232 Cell::Time(v) => v.into_datum(),
233 Cell::Timestamp(v) => v.into_datum(),
234 Cell::Timestamptz(v) => v.into_datum(),
235 Cell::Interval(v) => v.into_datum(),
236 Cell::Json(v) => v.into_datum(),
237 Cell::Bytea(v) => Some(Datum::from(v)),
238 Cell::Uuid(v) => v.into_datum(),
239 Cell::BoolArray(v) => v.into_datum(),
240 Cell::I16Array(v) => v.into_datum(),
241 Cell::I32Array(v) => v.into_datum(),
242 Cell::I64Array(v) => v.into_datum(),
243 Cell::F32Array(v) => v.into_datum(),
244 Cell::F64Array(v) => v.into_datum(),
245 Cell::StringArray(v) => v.into_datum(),
246 }
247 }
248
249 fn type_oid() -> Oid {
250 Oid::INVALID
251 }
252
253 fn is_compatible_with(other: Oid) -> bool {
254 Self::type_oid() == other
255 || other == pg_sys::BOOLOID
256 || other == pg_sys::CHAROID
257 || other == pg_sys::INT2OID
258 || other == pg_sys::FLOAT4OID
259 || other == pg_sys::INT4OID
260 || other == pg_sys::FLOAT8OID
261 || other == pg_sys::INT8OID
262 || other == pg_sys::NUMERICOID
263 || other == pg_sys::TEXTOID
264 || other == pg_sys::DATEOID
265 || other == pg_sys::TIMEOID
266 || other == pg_sys::TIMESTAMPOID
267 || other == pg_sys::TIMESTAMPTZOID
268 || other == pg_sys::INTERVALOID
269 || other == pg_sys::JSONBOID
270 || other == pg_sys::BYTEAOID
271 || other == pg_sys::UUIDOID
272 || other == pg_sys::BOOLARRAYOID
273 || other == pg_sys::INT2ARRAYOID
274 || other == pg_sys::INT4ARRAYOID
275 || other == pg_sys::INT8ARRAYOID
276 || other == pg_sys::FLOAT4ARRAYOID
277 || other == pg_sys::FLOAT8ARRAYOID
278 || other == pg_sys::TEXTARRAYOID
279 }
280}
281
282impl FromDatum for Cell {
283 unsafe fn from_polymorphic_datum(datum: Datum, is_null: bool, typoid: Oid) -> Option<Self>
284 where
285 Self: Sized,
286 {
287 let oid = PgOid::from(typoid);
288 match oid {
289 PgOid::BuiltIn(PgBuiltInOids::BOOLOID) => {
290 bool::from_datum(datum, is_null).map(Cell::Bool)
291 }
292 PgOid::BuiltIn(PgBuiltInOids::CHAROID) => i8::from_datum(datum, is_null).map(Cell::I8),
293 PgOid::BuiltIn(PgBuiltInOids::INT2OID) => {
294 i16::from_datum(datum, is_null).map(Cell::I16)
295 }
296 PgOid::BuiltIn(PgBuiltInOids::FLOAT4OID) => {
297 f32::from_datum(datum, is_null).map(Cell::F32)
298 }
299 PgOid::BuiltIn(PgBuiltInOids::INT4OID) => {
300 i32::from_datum(datum, is_null).map(Cell::I32)
301 }
302 PgOid::BuiltIn(PgBuiltInOids::FLOAT8OID) => {
303 f64::from_datum(datum, is_null).map(Cell::F64)
304 }
305 PgOid::BuiltIn(PgBuiltInOids::INT8OID) => {
306 i64::from_datum(datum, is_null).map(Cell::I64)
307 }
308 PgOid::BuiltIn(PgBuiltInOids::NUMERICOID) => {
309 AnyNumeric::from_datum(datum, is_null).map(Cell::Numeric)
310 }
311 PgOid::BuiltIn(PgBuiltInOids::TEXTOID) => {
312 String::from_datum(datum, is_null).map(Cell::String)
313 }
314 PgOid::BuiltIn(PgBuiltInOids::DATEOID) => {
315 Date::from_datum(datum, is_null).map(Cell::Date)
316 }
317 PgOid::BuiltIn(PgBuiltInOids::TIMEOID) => {
318 Time::from_datum(datum, is_null).map(Cell::Time)
319 }
320 PgOid::BuiltIn(PgBuiltInOids::TIMESTAMPOID) => {
321 Timestamp::from_datum(datum, is_null).map(Cell::Timestamp)
322 }
323 PgOid::BuiltIn(PgBuiltInOids::TIMESTAMPTZOID) => {
324 TimestampWithTimeZone::from_datum(datum, is_null).map(Cell::Timestamptz)
325 }
326 PgOid::BuiltIn(PgBuiltInOids::INTERVALOID) => {
327 Interval::from_datum(datum, is_null).map(Cell::Interval)
328 }
329 PgOid::BuiltIn(PgBuiltInOids::JSONBOID) => {
330 JsonB::from_datum(datum, is_null).map(Cell::Json)
331 }
332 PgOid::BuiltIn(PgBuiltInOids::BYTEAOID) => {
333 Some(Cell::Bytea(datum.cast_mut_ptr::<bytea>()))
334 }
335 PgOid::BuiltIn(PgBuiltInOids::UUIDOID) => {
336 Uuid::from_datum(datum, is_null).map(Cell::Uuid)
337 }
338 PgOid::BuiltIn(PgBuiltInOids::BOOLARRAYOID) => {
339 Vec::<Option<bool>>::from_datum(datum, false).map(Cell::BoolArray)
340 }
341 PgOid::BuiltIn(PgBuiltInOids::INT2ARRAYOID) => {
342 Vec::<Option<i16>>::from_datum(datum, false).map(Cell::I16Array)
343 }
344 PgOid::BuiltIn(PgBuiltInOids::INT4ARRAYOID) => {
345 Vec::<Option<i32>>::from_datum(datum, false).map(Cell::I32Array)
346 }
347 PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID) => {
348 Vec::<Option<i64>>::from_datum(datum, false).map(Cell::I64Array)
349 }
350 PgOid::BuiltIn(PgBuiltInOids::FLOAT4ARRAYOID) => {
351 Vec::<Option<f32>>::from_datum(datum, false).map(Cell::F32Array)
352 }
353 PgOid::BuiltIn(PgBuiltInOids::FLOAT8ARRAYOID) => {
354 Vec::<Option<f64>>::from_datum(datum, false).map(Cell::F64Array)
355 }
356 PgOid::BuiltIn(PgBuiltInOids::TEXTARRAYOID) => {
357 Vec::<Option<String>>::from_datum(datum, false).map(Cell::StringArray)
358 }
359 _ => None,
360 }
361 }
362}
363
364pub trait CellFormatter {
365 fn fmt_cell(&mut self, cell: &Cell) -> String;
366}
367
368struct DefaultFormatter {}
369
370impl DefaultFormatter {
371 fn new() -> Self {
372 Self {}
373 }
374}
375
376impl CellFormatter for DefaultFormatter {
377 fn fmt_cell(&mut self, cell: &Cell) -> String {
378 format!("{}", cell)
379 }
380}
381
382#[derive(Debug, Clone, Default)]
387pub struct Row {
388 pub cols: Vec<String>,
390
391 pub cells: Vec<Option<Cell>>,
393}
394
395impl Row {
396 pub fn new() -> Self {
398 Self::default()
399 }
400
401 pub fn push(&mut self, col: &str, cell: Option<Cell>) {
403 self.cols.push(col.to_owned());
404 self.cells.push(cell);
405 }
406
407 pub fn iter(&self) -> Zip<Iter<'_, String>, Iter<'_, Option<Cell>>> {
409 self.cols.iter().zip(self.cells.iter())
410 }
411
412 pub fn retain<F>(&mut self, f: F)
414 where
415 F: FnMut((&String, &Option<Cell>)) -> bool,
416 {
417 let keep: Vec<bool> = self.iter().map(f).collect();
418 let mut iter = keep.iter();
419 self.cols.retain(|_| *iter.next().unwrap_or(&true));
420 iter = keep.iter();
421 self.cells.retain(|_| *iter.next().unwrap_or(&true));
422 }
423
424 #[inline]
426 pub fn replace_with(&mut self, src: Row) {
427 let _ = mem::replace(self, src);
428 }
429
430 pub fn clear(&mut self) {
432 self.cols.clear();
433 self.cells.clear();
434 }
435}
436
437#[derive(Debug, Clone, Default)]
441pub struct Column {
442 pub name: String,
444
445 pub num: usize,
447
448 pub type_oid: Oid,
450}
451
452#[derive(Debug, Clone)]
454pub enum Value {
455 Cell(Cell),
456 Array(Vec<Cell>),
457}
458
459#[derive(Debug, Clone)]
461pub struct Param {
462 pub id: usize,
464
465 pub type_oid: Oid,
467}
468
469#[derive(Debug, Clone)]
513pub struct Qual {
514 pub field: String,
515 pub operator: String,
516 pub value: Value,
517 pub use_or: bool,
518 pub param: Option<Param>,
519}
520
521impl Qual {
522 pub fn deparse(&self) -> String {
523 let mut formatter = DefaultFormatter::new();
524 self.deparse_with_fmt(&mut formatter)
525 }
526
527 pub fn deparse_with_fmt<T: CellFormatter>(&self, t: &mut T) -> String {
528 if self.use_or {
529 match &self.value {
530 Value::Cell(_) => unreachable!(),
531 Value::Array(cells) => {
532 let conds: Vec<String> = cells
533 .iter()
534 .map(|cell| {
535 format!("{} {} {}", self.field, self.operator, t.fmt_cell(cell))
536 })
537 .collect();
538 conds.join(" or ")
539 }
540 }
541 } else {
542 match &self.value {
543 Value::Cell(cell) => match self.operator.as_str() {
544 "is" | "is not" => match cell {
545 Cell::String(cell) if cell == "null" => {
546 format!("{} {} null", self.field, self.operator)
547 }
548 _ => format!("{} {} {}", self.field, self.operator, t.fmt_cell(cell)),
549 },
550 "~~" => format!("{} like {}", self.field, t.fmt_cell(cell)),
551 "!~~" => format!("{} not like {}", self.field, t.fmt_cell(cell)),
552 _ => format!("{} {} {}", self.field, self.operator, t.fmt_cell(cell)),
553 },
554 Value::Array(_) => unreachable!(),
555 }
556 }
557 }
558}
559
560#[derive(Debug, Clone, Default)]
587pub struct Sort {
588 pub field: String,
589 pub field_no: usize,
590 pub reversed: bool,
591 pub nulls_first: bool,
592 pub collate: Option<String>,
593}
594
595impl Sort {
596 pub fn deparse(&self) -> String {
597 let mut sql = self.field.to_string();
598
599 if self.reversed {
600 sql.push_str(" desc");
601 } else {
602 sql.push_str(" asc");
603 }
604
605 if self.nulls_first {
606 sql.push_str(" nulls first")
607 } else {
608 sql.push_str(" nulls last")
609 }
610
611 sql
612 }
613
614 pub fn deparse_with_collate(&self) -> String {
615 let mut sql = self.deparse();
616
617 if let Some(collate) = &self.collate {
618 sql.push_str(&format!(" collate {}", collate));
619 }
620
621 sql
622 }
623}
624
625#[derive(Debug, Clone, Default)]
639pub struct Limit {
640 pub count: i64,
641 pub offset: i64,
642}
643
644impl Limit {
645 pub fn deparse(&self) -> String {
646 format!("limit {} offset {}", self.count, self.offset)
647 }
648}
649
650pub trait ForeignDataWrapper<E: Into<ErrorReport>> {
663 fn new(server: ForeignServer) -> Result<Self, E>
681 where
682 Self: Sized;
683
684 fn get_rel_size(
691 &mut self,
692 _quals: &[Qual],
693 _columns: &[Column],
694 _sorts: &[Sort],
695 _limit: &Option<Limit>,
696 _options: &HashMap<String, String>,
697 ) -> Result<(i64, i32), E> {
698 Ok((0, 0))
699 }
700
701 fn begin_scan(
711 &mut self,
712 quals: &[Qual],
713 columns: &[Column],
714 sorts: &[Sort],
715 limit: &Option<Limit>,
716 options: &HashMap<String, String>,
717 ) -> Result<(), E>;
718
719 fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, E>;
725
726 fn re_scan(&mut self) -> Result<(), E> {
730 Ok(())
731 }
732
733 fn end_scan(&mut self) -> Result<(), E>;
737
738 fn begin_modify(&mut self, _options: &HashMap<String, String>) -> Result<(), E> {
761 Ok(())
762 }
763
764 fn insert(&mut self, _row: &Row) -> Result<(), E> {
770 Ok(())
771 }
772
773 fn update(&mut self, _rowid: &Cell, _new_row: &Row) -> Result<(), E> {
780 Ok(())
781 }
782
783 fn delete(&mut self, _rowid: &Cell) -> Result<(), E> {
789 Ok(())
790 }
791
792 fn end_modify(&mut self) -> Result<(), E> {
796 Ok(())
797 }
798
799 fn import_foreign_schema(
806 &mut self,
807 _stmt: crate::import_foreign_schema::ImportForeignSchemaStmt,
808 ) -> Result<Vec<String>, E> {
809 Ok(Vec::new())
810 }
811
812 fn fdw_routine() -> FdwRoutine
816 where
817 Self: Sized,
818 {
819 unsafe {
820 use crate::{import_foreign_schema, modify, scan};
821 let mut fdw_routine =
822 FdwRoutine::<AllocatedByRust>::alloc_node(pg_sys::NodeTag::T_FdwRoutine);
823
824 fdw_routine.ImportForeignSchema =
826 Some(import_foreign_schema::import_foreign_schema::<E, Self>);
827
828 fdw_routine.GetForeignRelSize = Some(scan::get_foreign_rel_size::<E, Self>);
830 fdw_routine.GetForeignPaths = Some(scan::get_foreign_paths::<E, Self>);
831 fdw_routine.GetForeignPlan = Some(scan::get_foreign_plan::<E, Self>);
832 fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::<E, Self>);
833
834 fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::<E, Self>);
836 fdw_routine.IterateForeignScan = Some(scan::iterate_foreign_scan::<E, Self>);
837 fdw_routine.ReScanForeignScan = Some(scan::re_scan_foreign_scan::<E, Self>);
838 fdw_routine.EndForeignScan = Some(scan::end_foreign_scan::<E, Self>);
839
840 fdw_routine.AddForeignUpdateTargets = Some(modify::add_foreign_update_targets);
842 fdw_routine.PlanForeignModify = Some(modify::plan_foreign_modify::<E, Self>);
843 fdw_routine.BeginForeignModify = Some(modify::begin_foreign_modify::<E, Self>);
844 fdw_routine.ExecForeignInsert = Some(modify::exec_foreign_insert::<E, Self>);
845 fdw_routine.ExecForeignDelete = Some(modify::exec_foreign_delete::<E, Self>);
846 fdw_routine.ExecForeignUpdate = Some(modify::exec_foreign_update::<E, Self>);
847 fdw_routine.EndForeignModify = Some(modify::end_foreign_modify::<E, Self>);
848
849 Self::fdw_routine_hook(&mut fdw_routine);
850 fdw_routine.into_pg_boxed()
851 }
852 }
853
854 fn fdw_routine_hook(_routine: &mut FdwRoutine<AllocatedByRust>) {}
857
858 fn validator(_options: Vec<Option<String>>, _catalog: Option<Oid>) -> Result<(), E> {
913 Ok(())
914 }
915}