datafusion_remote_table/connection/
oracle.rs

1use crate::connection::{RemoteDbType, just_return, projections_contains};
2use crate::utils::big_decimal_to_i128;
3use crate::{
4    Connection, ConnectionOptions, DFResult, Literalize, OracleConnectionOptions, OracleType, Pool,
5    PoolState, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteSource, RemoteType,
6};
7use arrow::array::{
8    ArrayRef, BinaryBuilder, BinaryViewBuilder, BooleanBuilder, Date64Builder, Decimal128Builder,
9    Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
10    LargeStringBuilder, RecordBatch, RecordBatchOptions, StringBuilder, StringViewBuilder,
11    StructBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, make_builder,
12};
13use arrow::datatypes::{DataType, Fields, SchemaRef, TimeUnit};
14use bb8_oracle::OracleConnectionManager;
15use datafusion_common::{DataFusionError, project_schema};
16use datafusion_execution::SendableRecordBatchStream;
17use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
18use futures::StreamExt;
19use log::debug;
20use oracle::sql_type::{Object, OracleType as ColumnType};
21use oracle::{Connector, Row};
22use std::any::Any;
23use std::sync::Arc;
24
25#[derive(Debug)]
26pub struct OraclePool {
27    pool: bb8::Pool<OracleConnectionManager>,
28}
29
30pub(crate) async fn connect_oracle(options: &OracleConnectionOptions) -> DFResult<OraclePool> {
31    let connect_string = format!(
32        "//{}:{}/{}",
33        options.host, options.port, options.service_name
34    );
35    let connector = Connector::new(
36        options.username.clone(),
37        options.password.clone(),
38        connect_string,
39    );
40    let _ = connector
41        .connect()
42        .map_err(|e| DataFusionError::Internal(format!("Failed to connect to oracle: {e:?}")))?;
43    let manager = OracleConnectionManager::from_connector(connector);
44    let pool = bb8::Pool::builder()
45        .max_size(options.pool_max_size as u32)
46        .min_idle(Some(options.pool_min_idle as u32))
47        .idle_timeout(Some(options.pool_idle_timeout))
48        .reaper_rate(options.pool_ttl_check_interval)
49        .build(manager)
50        .await
51        .map_err(|e| DataFusionError::Internal(format!("Failed to create oracle pool: {e:?}")))?;
52    Ok(OraclePool { pool })
53}
54
55#[async_trait::async_trait]
56impl Pool for OraclePool {
57    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
58        let conn = self.pool.get_owned().await.map_err(|e| {
59            DataFusionError::Execution(format!("Failed to get oracle connection due to {e:?}"))
60        })?;
61        Ok(Arc::new(OracleConnection { conn }))
62    }
63
64    async fn state(&self) -> DFResult<PoolState> {
65        let bb8_state = self.pool.state();
66        Ok(PoolState {
67            connections: bb8_state.connections as usize,
68            idle_connections: bb8_state.idle_connections as usize,
69        })
70    }
71}
72
73#[derive(Debug)]
74pub struct OracleConnection {
75    conn: bb8::PooledConnection<'static, OracleConnectionManager>,
76}
77
78#[async_trait::async_trait]
79impl Connection for OracleConnection {
80    fn as_any(&self) -> &dyn Any {
81        self
82    }
83
84    async fn infer_schema(&self, source: &RemoteSource) -> DFResult<RemoteSchemaRef> {
85        let sql = RemoteDbType::Oracle.limit_1_query_if_possible(source);
86        let result_set = self.conn.query(&sql, &[]).map_err(|e| {
87            DataFusionError::Plan(format!("Failed to execute query {sql} on oracle: {e:?}"))
88        })?;
89        let remote_schema = Arc::new(build_remote_schema(&result_set)?);
90        Ok(remote_schema)
91    }
92
93    async fn query(
94        &self,
95        conn_options: &ConnectionOptions,
96        source: &RemoteSource,
97        table_schema: SchemaRef,
98        projection: Option<&Vec<usize>>,
99        unparsed_filters: &[String],
100        limit: Option<usize>,
101    ) -> DFResult<SendableRecordBatchStream> {
102        let projected_schema = project_schema(&table_schema, projection)?;
103
104        let sql = RemoteDbType::Oracle.rewrite_query(source, unparsed_filters, limit);
105        debug!("[remote-table] executing oracle query: {sql}");
106
107        let projection = projection.cloned();
108        let chunk_size = conn_options.stream_chunk_size();
109        let result_set = self.conn.query(&sql, &[]).map_err(|e| {
110            DataFusionError::Execution(format!("Failed to execute query on oracle: {e:?}"))
111        })?;
112        let stream = futures::stream::iter(result_set).chunks(chunk_size).boxed();
113
114        let stream = stream.map(move |rows| {
115            let rows: Vec<Row> = rows
116                .into_iter()
117                .collect::<Result<Vec<_>, _>>()
118                .map_err(|e| {
119                    DataFusionError::Execution(format!(
120                        "Failed to collect rows from oracle due to {e}",
121                    ))
122                })?;
123            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
124        });
125
126        Ok(Box::pin(RecordBatchStreamAdapter::new(
127            projected_schema,
128            stream,
129        )))
130    }
131
132    async fn insert(
133        &self,
134        _conn_options: &ConnectionOptions,
135        _literalizer: Arc<dyn Literalize>,
136        _table: &[String],
137        _remote_schema: RemoteSchemaRef,
138        _batch: RecordBatch,
139    ) -> DFResult<usize> {
140        Err(DataFusionError::Execution(
141            "Insert operation is not supported for oracle".to_string(),
142        ))
143    }
144}
145
146fn oracle_type_to_remote_type(oracle_type: &ColumnType) -> DFResult<OracleType> {
147    match oracle_type {
148        ColumnType::Number(precision, scale) => {
149            // TODO need more investigation on the precision and scale
150            let precision = if *precision == 0 { 38 } else { *precision };
151            let scale = if *scale == -127 { 0 } else { *scale };
152            Ok(OracleType::Number(precision, scale))
153        }
154        ColumnType::BinaryFloat => Ok(OracleType::BinaryFloat),
155        ColumnType::BinaryDouble => Ok(OracleType::BinaryDouble),
156        ColumnType::Float(precision) => Ok(OracleType::Float(*precision)),
157        ColumnType::Varchar2(size) => Ok(OracleType::Varchar2(*size)),
158        ColumnType::NVarchar2(size) => Ok(OracleType::NVarchar2(*size)),
159        ColumnType::Char(size) => Ok(OracleType::Char(*size)),
160        ColumnType::NChar(size) => Ok(OracleType::NChar(*size)),
161        ColumnType::Long => Ok(OracleType::Long),
162        ColumnType::CLOB => Ok(OracleType::Clob),
163        ColumnType::NCLOB => Ok(OracleType::NClob),
164        ColumnType::Raw(size) => Ok(OracleType::Raw(*size)),
165        ColumnType::LongRaw => Ok(OracleType::LongRaw),
166        ColumnType::BLOB => Ok(OracleType::Blob),
167        ColumnType::Date => Ok(OracleType::Date),
168        ColumnType::Timestamp(_) => Ok(OracleType::Timestamp),
169        ColumnType::Boolean => Ok(OracleType::Boolean),
170        ColumnType::Object(_) => Ok(OracleType::SdeGeometry),
171        _ => Err(DataFusionError::NotImplemented(format!(
172            "Unsupported oracle type: {oracle_type:?}",
173        ))),
174    }
175}
176
177fn build_remote_schema(result_set: &oracle::ResultSet<Row>) -> DFResult<RemoteSchema> {
178    let mut remote_fields = vec![];
179    for col in result_set.column_info() {
180        let remote_type = RemoteType::Oracle(oracle_type_to_remote_type(col.oracle_type())?);
181        remote_fields.push(RemoteField::new(col.name(), remote_type, col.nullable()));
182    }
183    Ok(RemoteSchema::new(remote_fields))
184}
185
186macro_rules! handle_primitive_type {
187    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
188        let builder = $builder
189            .as_any_mut()
190            .downcast_mut::<$builder_ty>()
191            .unwrap_or_else(|| {
192                panic!(
193                    "Failed to downcast builder to {} for {:?} and {:?}",
194                    stringify!($builder_ty),
195                    $field,
196                    $col
197                )
198            });
199        let v = $row.get::<usize, Option<$value_ty>>($index).map_err(|e| {
200            DataFusionError::Execution(format!(
201                "Failed to get {} value for {:?} and {:?}: {e:?}",
202                stringify!($value_ty),
203                $field,
204                $col
205            ))
206        })?;
207
208        match v {
209            Some(v) => builder.append_value($convert(v)?),
210            None => builder.append_null(),
211        }
212    }};
213}
214
215fn rows_to_batch(
216    rows: &[Row],
217    table_schema: &SchemaRef,
218    projection: Option<&Vec<usize>>,
219) -> DFResult<RecordBatch> {
220    let projected_schema = project_schema(table_schema, projection)?;
221    let mut array_builders = vec![];
222    for field in table_schema.fields() {
223        let builder = make_builder(field.data_type(), rows.len());
224        array_builders.push(builder);
225    }
226
227    for row in rows {
228        for (idx, field) in table_schema.fields.iter().enumerate() {
229            if !projections_contains(projection, idx) {
230                continue;
231            }
232            let builder = &mut array_builders[idx];
233            let col = row.column_info().get(idx);
234            match field.data_type() {
235                DataType::Int16 => {
236                    handle_primitive_type!(
237                        builder,
238                        field,
239                        col,
240                        Int16Builder,
241                        i16,
242                        row,
243                        idx,
244                        just_return
245                    );
246                }
247                DataType::Int32 => {
248                    handle_primitive_type!(
249                        builder,
250                        field,
251                        col,
252                        Int32Builder,
253                        i32,
254                        row,
255                        idx,
256                        just_return
257                    );
258                }
259                DataType::Int64 => {
260                    handle_primitive_type!(
261                        builder,
262                        field,
263                        col,
264                        Int64Builder,
265                        i64,
266                        row,
267                        idx,
268                        just_return
269                    );
270                }
271                DataType::Float32 => {
272                    handle_primitive_type!(
273                        builder,
274                        field,
275                        col,
276                        Float32Builder,
277                        f32,
278                        row,
279                        idx,
280                        just_return
281                    );
282                }
283                DataType::Float64 => {
284                    handle_primitive_type!(
285                        builder,
286                        field,
287                        col,
288                        Float64Builder,
289                        f64,
290                        row,
291                        idx,
292                        just_return
293                    );
294                }
295                DataType::Utf8 => {
296                    handle_primitive_type!(
297                        builder,
298                        field,
299                        col,
300                        StringBuilder,
301                        String,
302                        row,
303                        idx,
304                        just_return
305                    );
306                }
307                DataType::LargeUtf8 => {
308                    handle_primitive_type!(
309                        builder,
310                        field,
311                        col,
312                        LargeStringBuilder,
313                        String,
314                        row,
315                        idx,
316                        just_return
317                    );
318                }
319                DataType::Utf8View => {
320                    handle_primitive_type!(
321                        builder,
322                        field,
323                        col,
324                        StringViewBuilder,
325                        String,
326                        row,
327                        idx,
328                        just_return
329                    );
330                }
331                DataType::Decimal128(_precision, scale) => {
332                    handle_primitive_type!(
333                        builder,
334                        field,
335                        col,
336                        Decimal128Builder,
337                        String,
338                        row,
339                        idx,
340                        |v: String| {
341                            let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
342                                DataFusionError::Execution(format!(
343                                    "Failed to parse BigDecimal from {v:?}: {e:?}",
344                                ))
345                            })?;
346                            big_decimal_to_i128(&decimal, Some(*scale as i32))
347                        }
348                    );
349                }
350                DataType::Timestamp(TimeUnit::Second, None) => {
351                    handle_primitive_type!(
352                        builder,
353                        field,
354                        col,
355                        TimestampSecondBuilder,
356                        chrono::NaiveDateTime,
357                        row,
358                        idx,
359                        |v: chrono::NaiveDateTime| {
360                            let t = v.and_utc().timestamp();
361                            Ok::<_, DataFusionError>(t)
362                        }
363                    );
364                }
365                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
366                    handle_primitive_type!(
367                        builder,
368                        field,
369                        col,
370                        TimestampNanosecondBuilder,
371                        chrono::NaiveDateTime,
372                        row,
373                        idx,
374                        |v: chrono::NaiveDateTime| {
375                            v.and_utc().timestamp_nanos_opt().ok_or_else(|| {
376                                DataFusionError::Execution(format!(
377                                    "Failed to convert chrono::NaiveDateTime {v} to nanos timestamp"
378                                ))
379                            })
380                        }
381                    );
382                }
383                DataType::Date64 => {
384                    handle_primitive_type!(
385                        builder,
386                        field,
387                        col,
388                        Date64Builder,
389                        chrono::NaiveDateTime,
390                        row,
391                        idx,
392                        |v: chrono::NaiveDateTime| {
393                            Ok::<_, DataFusionError>(v.and_utc().timestamp_millis())
394                        }
395                    );
396                }
397                DataType::Boolean => {
398                    handle_primitive_type!(
399                        builder,
400                        field,
401                        col,
402                        BooleanBuilder,
403                        bool,
404                        row,
405                        idx,
406                        just_return
407                    );
408                }
409                DataType::Binary => {
410                    handle_primitive_type!(
411                        builder,
412                        field,
413                        col,
414                        BinaryBuilder,
415                        Vec<u8>,
416                        row,
417                        idx,
418                        just_return
419                    );
420                }
421                DataType::LargeBinary => {
422                    handle_primitive_type!(
423                        builder,
424                        field,
425                        col,
426                        LargeBinaryBuilder,
427                        Vec<u8>,
428                        row,
429                        idx,
430                        just_return
431                    );
432                }
433                DataType::BinaryView => {
434                    handle_primitive_type!(
435                        builder,
436                        field,
437                        col,
438                        BinaryViewBuilder,
439                        Vec<u8>,
440                        row,
441                        idx,
442                        just_return
443                    );
444                }
445                DataType::Struct(fields) => {
446                    let builder = builder
447                        .as_any_mut()
448                        .downcast_mut::<StructBuilder>()
449                        .unwrap_or_else(|| {
450                            panic!("Failed to downcast builder to StructBuilder for {field:?} and {col:?}")
451                        });
452                    let object = row.get::<usize, Option<Object>>(idx).map_err(|e| {
453                        DataFusionError::Execution(format!(
454                            "Failed to get object for {field:?} and {col:?}: {e:?}"
455                        ))
456                    })?;
457                    append_object_to_struct_builder(builder, fields, object)?;
458                }
459                _ => {
460                    return Err(DataFusionError::NotImplemented(format!(
461                        "Unsupported data type {:?} for col: {:?}",
462                        field.data_type(),
463                        col
464                    )));
465                }
466            }
467        }
468    }
469
470    let projected_columns = array_builders
471        .into_iter()
472        .enumerate()
473        .filter(|(idx, _)| projections_contains(projection, *idx))
474        .map(|(_, mut builder)| builder.finish())
475        .collect::<Vec<ArrayRef>>();
476    let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
477    Ok(RecordBatch::try_new_with_options(
478        projected_schema,
479        projected_columns,
480        &options,
481    )?)
482}
483
484macro_rules! append_object_attr {
485    ($struct_builder:expr, $field_builder_type:ty, $field:expr, $field_idx:expr, $object_opt:expr, $field_value_ty:ty, $convert:expr) => {{
486        let field_builder = $struct_builder
487            .field_builder::<$field_builder_type>($field_idx)
488            .ok_or_else(|| {
489                DataFusionError::Execution(format!(
490                    "Failed to get {} field builder for {:?}",
491                    stringify!($field_builder_type),
492                    $field,
493                ))
494            })?;
495        match &$object_opt {
496            Some(object) => {
497                let field_name = $field.name();
498                let field_value_opt =
499                    object
500                        .get::<Option<$field_value_ty>>(field_name)
501                        .map_err(|e| {
502                            DataFusionError::Execution(format!(
503                                "Failed to get {} field value for {:?}: {e:?}",
504                                stringify!($field_value_ty),
505                                $field,
506                            ))
507                        })?;
508                match field_value_opt {
509                    Some(field_value) => {
510                        field_builder.append_value($convert(field_value)?);
511                    }
512                    None => {
513                        field_builder.append_null();
514                    }
515                }
516            }
517            None => {
518                field_builder.append_null();
519            }
520        }
521    }};
522}
523
524fn append_object_to_struct_builder(
525    builder: &mut StructBuilder,
526    fields: &Fields,
527    object_opt: Option<Object>,
528) -> DFResult<()> {
529    for (idx, field) in fields.iter().enumerate() {
530        match field.data_type() {
531            DataType::Int32 => {
532                append_object_attr!(
533                    builder,
534                    Int32Builder,
535                    field,
536                    idx,
537                    object_opt,
538                    i32,
539                    just_return
540                );
541            }
542            DataType::Int64 => {
543                append_object_attr!(
544                    builder,
545                    Int64Builder,
546                    field,
547                    idx,
548                    object_opt,
549                    i64,
550                    just_return
551                );
552            }
553            DataType::Float32 => {
554                append_object_attr!(
555                    builder,
556                    Float32Builder,
557                    field,
558                    idx,
559                    object_opt,
560                    f32,
561                    just_return
562                );
563            }
564            DataType::Float64 => {
565                append_object_attr!(
566                    builder,
567                    Float64Builder,
568                    field,
569                    idx,
570                    object_opt,
571                    f64,
572                    just_return
573                );
574            }
575            DataType::Decimal128(_precision, scale) => {
576                append_object_attr!(
577                    builder,
578                    Decimal128Builder,
579                    field,
580                    idx,
581                    object_opt,
582                    String,
583                    |v: String| {
584                        let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
585                            DataFusionError::Execution(format!(
586                                "Failed to parse BigDecimal from {v:?}: {e:?}",
587                            ))
588                        })?;
589                        big_decimal_to_i128(&decimal, Some(*scale as i32))
590                    }
591                );
592            }
593            DataType::Binary => {
594                append_object_attr!(
595                    builder,
596                    BinaryBuilder,
597                    field,
598                    idx,
599                    object_opt,
600                    Vec<u8>,
601                    just_return
602                );
603            }
604            DataType::LargeBinary => {
605                append_object_attr!(
606                    builder,
607                    LargeBinaryBuilder,
608                    field,
609                    idx,
610                    object_opt,
611                    Vec<u8>,
612                    just_return
613                );
614            }
615            DataType::Struct(fields) => {
616                let field_builder =
617                    builder.field_builder::<StructBuilder>(idx).ok_or_else(|| {
618                        DataFusionError::Execution(format!(
619                            "Failed to get struct field builder for {field:?}"
620                        ))
621                    })?;
622                match &object_opt {
623                    Some(object) => {
624                        let field_value =
625                            object.get::<Option<Object>>(field.name()).map_err(|e| {
626                                DataFusionError::Execution(format!(
627                                    "Failed to get object value for {field:?}: {e:?}"
628                                ))
629                            })?;
630                        append_object_to_struct_builder(field_builder, fields, field_value)?;
631                    }
632                    None => {
633                        field_builder.append_null();
634                    }
635                }
636            }
637            _ => {
638                return Err(DataFusionError::NotImplemented(format!(
639                    "Unsupported struct field type {}",
640                    field.data_type(),
641                )));
642            }
643        }
644    }
645
646    match &object_opt {
647        Some(_) => {
648            builder.append(true);
649        }
650        None => {
651            builder.append_null();
652        }
653    }
654    Ok(())
655}