datafusion_remote_table/connection/
oracle.rs

1use crate::connection::{RemoteDbType, just_return, projections_contains};
2use crate::utils::big_decimal_to_i128;
3use crate::{
4    Connection, ConnectionOptions, DFResult, Literalize, OracleConnectionOptions, OracleType, Pool,
5    PoolState, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteSource, RemoteType,
6};
7use bb8_oracle::OracleConnectionManager;
8use datafusion::arrow::array::{
9    ArrayRef, BinaryBuilder, BooleanBuilder, Date64Builder, Decimal128Builder, Float32Builder,
10    Float64Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
11    LargeStringBuilder, RecordBatch, RecordBatchOptions, StringBuilder, StructBuilder,
12    TimestampNanosecondBuilder, TimestampSecondBuilder, make_builder,
13};
14use datafusion::arrow::datatypes::{DataType, Fields, SchemaRef, TimeUnit};
15use datafusion::common::{DataFusionError, project_schema};
16use datafusion::execution::SendableRecordBatchStream;
17use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
18use futures::StreamExt;
19use log::debug;
20use oracle::sql_type::{Object, OracleType as ColumnType};
21use oracle::{Connector, Row};
22use std::any::Any;
23use std::sync::Arc;
24
25#[derive(Debug)]
26pub struct OraclePool {
27    pool: bb8::Pool<OracleConnectionManager>,
28}
29
30pub(crate) async fn connect_oracle(options: &OracleConnectionOptions) -> DFResult<OraclePool> {
31    let connect_string = format!(
32        "//{}:{}/{}",
33        options.host, options.port, options.service_name
34    );
35    let connector = Connector::new(
36        options.username.clone(),
37        options.password.clone(),
38        connect_string,
39    );
40    let _ = connector
41        .connect()
42        .map_err(|e| DataFusionError::Internal(format!("Failed to connect to oracle: {e:?}")))?;
43    let manager = OracleConnectionManager::from_connector(connector);
44    let pool = bb8::Pool::builder()
45        .max_size(options.pool_max_size as u32)
46        .min_idle(Some(options.pool_min_idle as u32))
47        .idle_timeout(Some(options.pool_idle_timeout))
48        .reaper_rate(options.pool_ttl_check_interval)
49        .build(manager)
50        .await
51        .map_err(|e| DataFusionError::Internal(format!("Failed to create oracle pool: {e:?}")))?;
52    Ok(OraclePool { pool })
53}
54
55#[async_trait::async_trait]
56impl Pool for OraclePool {
57    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
58        let conn = self.pool.get_owned().await.map_err(|e| {
59            DataFusionError::Execution(format!("Failed to get oracle connection due to {e:?}"))
60        })?;
61        Ok(Arc::new(OracleConnection { conn }))
62    }
63
64    async fn state(&self) -> DFResult<PoolState> {
65        let bb8_state = self.pool.state();
66        Ok(PoolState {
67            connections: bb8_state.connections as usize,
68            idle_connections: bb8_state.idle_connections as usize,
69        })
70    }
71}
72
73#[derive(Debug)]
74pub struct OracleConnection {
75    conn: bb8::PooledConnection<'static, OracleConnectionManager>,
76}
77
78#[async_trait::async_trait]
79impl Connection for OracleConnection {
80    fn as_any(&self) -> &dyn Any {
81        self
82    }
83
84    async fn infer_schema(&self, source: &RemoteSource) -> DFResult<RemoteSchemaRef> {
85        let sql = RemoteDbType::Oracle.limit_1_query_if_possible(source);
86        let result_set = self.conn.query(&sql, &[]).map_err(|e| {
87            DataFusionError::Plan(format!("Failed to execute query {sql} on oracle: {e:?}"))
88        })?;
89        let remote_schema = Arc::new(build_remote_schema(&result_set)?);
90        Ok(remote_schema)
91    }
92
93    async fn query(
94        &self,
95        conn_options: &ConnectionOptions,
96        source: &RemoteSource,
97        table_schema: SchemaRef,
98        projection: Option<&Vec<usize>>,
99        unparsed_filters: &[String],
100        limit: Option<usize>,
101    ) -> DFResult<SendableRecordBatchStream> {
102        let projected_schema = project_schema(&table_schema, projection)?;
103
104        let sql = RemoteDbType::Oracle.rewrite_query(source, unparsed_filters, limit);
105        debug!("[remote-table] executing oracle query: {sql}");
106
107        let projection = projection.cloned();
108        let chunk_size = conn_options.stream_chunk_size();
109        let result_set = self.conn.query(&sql, &[]).map_err(|e| {
110            DataFusionError::Execution(format!("Failed to execute query on oracle: {e:?}"))
111        })?;
112        let stream = futures::stream::iter(result_set).chunks(chunk_size).boxed();
113
114        let stream = stream.map(move |rows| {
115            let rows: Vec<Row> = rows
116                .into_iter()
117                .collect::<Result<Vec<_>, _>>()
118                .map_err(|e| {
119                    DataFusionError::Execution(format!(
120                        "Failed to collect rows from oracle due to {e}",
121                    ))
122                })?;
123            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
124        });
125
126        Ok(Box::pin(RecordBatchStreamAdapter::new(
127            projected_schema,
128            stream,
129        )))
130    }
131
132    async fn insert(
133        &self,
134        _conn_options: &ConnectionOptions,
135        _literalizer: Arc<dyn Literalize>,
136        _table: &[String],
137        _remote_schema: RemoteSchemaRef,
138        _input: SendableRecordBatchStream,
139    ) -> DFResult<usize> {
140        Err(DataFusionError::Execution(
141            "Insert operation is not supported for oracle".to_string(),
142        ))
143    }
144}
145
146fn oracle_type_to_remote_type(oracle_type: &ColumnType) -> DFResult<OracleType> {
147    match oracle_type {
148        ColumnType::Number(precision, scale) => {
149            // TODO need more investigation on the precision and scale
150            let precision = if *precision == 0 { 38 } else { *precision };
151            let scale = if *scale == -127 { 0 } else { *scale };
152            Ok(OracleType::Number(precision, scale))
153        }
154        ColumnType::BinaryFloat => Ok(OracleType::BinaryFloat),
155        ColumnType::BinaryDouble => Ok(OracleType::BinaryDouble),
156        ColumnType::Float(precision) => Ok(OracleType::Float(*precision)),
157        ColumnType::Varchar2(size) => Ok(OracleType::Varchar2(*size)),
158        ColumnType::NVarchar2(size) => Ok(OracleType::NVarchar2(*size)),
159        ColumnType::Char(size) => Ok(OracleType::Char(*size)),
160        ColumnType::NChar(size) => Ok(OracleType::NChar(*size)),
161        ColumnType::Long => Ok(OracleType::Long),
162        ColumnType::CLOB => Ok(OracleType::Clob),
163        ColumnType::NCLOB => Ok(OracleType::NClob),
164        ColumnType::Raw(size) => Ok(OracleType::Raw(*size)),
165        ColumnType::LongRaw => Ok(OracleType::LongRaw),
166        ColumnType::BLOB => Ok(OracleType::Blob),
167        ColumnType::Date => Ok(OracleType::Date),
168        ColumnType::Timestamp(_) => Ok(OracleType::Timestamp),
169        ColumnType::Boolean => Ok(OracleType::Boolean),
170        ColumnType::Object(_) => Ok(OracleType::SdeGeometry),
171        _ => Err(DataFusionError::NotImplemented(format!(
172            "Unsupported oracle type: {oracle_type:?}",
173        ))),
174    }
175}
176
177fn build_remote_schema(result_set: &oracle::ResultSet<Row>) -> DFResult<RemoteSchema> {
178    let mut remote_fields = vec![];
179    for col in result_set.column_info() {
180        let remote_type = RemoteType::Oracle(oracle_type_to_remote_type(col.oracle_type())?);
181        remote_fields.push(RemoteField::new(col.name(), remote_type, col.nullable()));
182    }
183    Ok(RemoteSchema::new(remote_fields))
184}
185
186macro_rules! handle_primitive_type {
187    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
188        let builder = $builder
189            .as_any_mut()
190            .downcast_mut::<$builder_ty>()
191            .unwrap_or_else(|| {
192                panic!(
193                    "Failed to downcast builder to {} for {:?} and {:?}",
194                    stringify!($builder_ty),
195                    $field,
196                    $col
197                )
198            });
199        let v = $row.get::<usize, Option<$value_ty>>($index).map_err(|e| {
200            DataFusionError::Execution(format!(
201                "Failed to get {} value for {:?} and {:?}: {e:?}",
202                stringify!($value_ty),
203                $field,
204                $col
205            ))
206        })?;
207
208        match v {
209            Some(v) => builder.append_value($convert(v)?),
210            None => builder.append_null(),
211        }
212    }};
213}
214
215fn rows_to_batch(
216    rows: &[Row],
217    table_schema: &SchemaRef,
218    projection: Option<&Vec<usize>>,
219) -> DFResult<RecordBatch> {
220    let projected_schema = project_schema(table_schema, projection)?;
221    let mut array_builders = vec![];
222    for field in table_schema.fields() {
223        let builder = make_builder(field.data_type(), rows.len());
224        array_builders.push(builder);
225    }
226
227    for row in rows {
228        for (idx, field) in table_schema.fields.iter().enumerate() {
229            if !projections_contains(projection, idx) {
230                continue;
231            }
232            let builder = &mut array_builders[idx];
233            let col = row.column_info().get(idx);
234            match field.data_type() {
235                DataType::Int16 => {
236                    handle_primitive_type!(
237                        builder,
238                        field,
239                        col,
240                        Int16Builder,
241                        i16,
242                        row,
243                        idx,
244                        just_return
245                    );
246                }
247                DataType::Int32 => {
248                    handle_primitive_type!(
249                        builder,
250                        field,
251                        col,
252                        Int32Builder,
253                        i32,
254                        row,
255                        idx,
256                        just_return
257                    );
258                }
259                DataType::Int64 => {
260                    handle_primitive_type!(
261                        builder,
262                        field,
263                        col,
264                        Int64Builder,
265                        i64,
266                        row,
267                        idx,
268                        just_return
269                    );
270                }
271                DataType::Float32 => {
272                    handle_primitive_type!(
273                        builder,
274                        field,
275                        col,
276                        Float32Builder,
277                        f32,
278                        row,
279                        idx,
280                        just_return
281                    );
282                }
283                DataType::Float64 => {
284                    handle_primitive_type!(
285                        builder,
286                        field,
287                        col,
288                        Float64Builder,
289                        f64,
290                        row,
291                        idx,
292                        just_return
293                    );
294                }
295                DataType::Utf8 => {
296                    handle_primitive_type!(
297                        builder,
298                        field,
299                        col,
300                        StringBuilder,
301                        String,
302                        row,
303                        idx,
304                        just_return
305                    );
306                }
307                DataType::LargeUtf8 => {
308                    handle_primitive_type!(
309                        builder,
310                        field,
311                        col,
312                        LargeStringBuilder,
313                        String,
314                        row,
315                        idx,
316                        just_return
317                    );
318                }
319                DataType::Decimal128(_precision, scale) => {
320                    handle_primitive_type!(
321                        builder,
322                        field,
323                        col,
324                        Decimal128Builder,
325                        String,
326                        row,
327                        idx,
328                        |v: String| {
329                            let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
330                                DataFusionError::Execution(format!(
331                                    "Failed to parse BigDecimal from {v:?}: {e:?}",
332                                ))
333                            })?;
334                            big_decimal_to_i128(&decimal, Some(*scale as i32))
335                        }
336                    );
337                }
338                DataType::Timestamp(TimeUnit::Second, None) => {
339                    handle_primitive_type!(
340                        builder,
341                        field,
342                        col,
343                        TimestampSecondBuilder,
344                        chrono::NaiveDateTime,
345                        row,
346                        idx,
347                        |v: chrono::NaiveDateTime| {
348                            let t = v.and_utc().timestamp();
349                            Ok::<_, DataFusionError>(t)
350                        }
351                    );
352                }
353                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
354                    handle_primitive_type!(
355                        builder,
356                        field,
357                        col,
358                        TimestampNanosecondBuilder,
359                        chrono::NaiveDateTime,
360                        row,
361                        idx,
362                        |v: chrono::NaiveDateTime| {
363                            v.and_utc().timestamp_nanos_opt().ok_or_else(|| {
364                                DataFusionError::Execution(format!(
365                                    "Failed to convert chrono::NaiveDateTime {v} to nanos timestamp"
366                                ))
367                            })
368                        }
369                    );
370                }
371                DataType::Date64 => {
372                    handle_primitive_type!(
373                        builder,
374                        field,
375                        col,
376                        Date64Builder,
377                        chrono::NaiveDateTime,
378                        row,
379                        idx,
380                        |v: chrono::NaiveDateTime| {
381                            Ok::<_, DataFusionError>(v.and_utc().timestamp_millis())
382                        }
383                    );
384                }
385                DataType::Boolean => {
386                    handle_primitive_type!(
387                        builder,
388                        field,
389                        col,
390                        BooleanBuilder,
391                        bool,
392                        row,
393                        idx,
394                        just_return
395                    );
396                }
397                DataType::Binary => {
398                    handle_primitive_type!(
399                        builder,
400                        field,
401                        col,
402                        BinaryBuilder,
403                        Vec<u8>,
404                        row,
405                        idx,
406                        just_return
407                    );
408                }
409                DataType::LargeBinary => {
410                    handle_primitive_type!(
411                        builder,
412                        field,
413                        col,
414                        LargeBinaryBuilder,
415                        Vec<u8>,
416                        row,
417                        idx,
418                        just_return
419                    );
420                }
421                DataType::Struct(fields) => {
422                    let builder = builder
423                        .as_any_mut()
424                        .downcast_mut::<StructBuilder>()
425                        .unwrap_or_else(|| {
426                            panic!("Failed to downcast builder to StructBuilder for {field:?} and {col:?}")
427                        });
428                    let object = row.get::<usize, Option<Object>>(idx).map_err(|e| {
429                        DataFusionError::Execution(format!(
430                            "Failed to get object for {field:?} and {col:?}: {e:?}"
431                        ))
432                    })?;
433                    append_object_to_struct_builder(builder, fields, object)?;
434                }
435                _ => {
436                    return Err(DataFusionError::NotImplemented(format!(
437                        "Unsupported data type {:?} for col: {:?}",
438                        field.data_type(),
439                        col
440                    )));
441                }
442            }
443        }
444    }
445
446    let projected_columns = array_builders
447        .into_iter()
448        .enumerate()
449        .filter(|(idx, _)| projections_contains(projection, *idx))
450        .map(|(_, mut builder)| builder.finish())
451        .collect::<Vec<ArrayRef>>();
452    let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
453    Ok(RecordBatch::try_new_with_options(
454        projected_schema,
455        projected_columns,
456        &options,
457    )?)
458}
459
460macro_rules! append_object_attr {
461    ($struct_builder:expr, $field_builder_type:ty, $field:expr, $field_idx:expr, $object_opt:expr, $field_value_ty:ty, $convert:expr) => {{
462        let field_builder = $struct_builder
463            .field_builder::<$field_builder_type>($field_idx)
464            .ok_or_else(|| {
465                DataFusionError::Execution(format!(
466                    "Failed to get {} field builder for {:?}",
467                    stringify!($field_builder_type),
468                    $field,
469                ))
470            })?;
471        match &$object_opt {
472            Some(object) => {
473                let field_name = $field.name();
474                let field_value_opt =
475                    object
476                        .get::<Option<$field_value_ty>>(field_name)
477                        .map_err(|e| {
478                            DataFusionError::Execution(format!(
479                                "Failed to get {} field value for {:?}: {e:?}",
480                                stringify!($field_value_ty),
481                                $field,
482                            ))
483                        })?;
484                match field_value_opt {
485                    Some(field_value) => {
486                        field_builder.append_value($convert(field_value)?);
487                    }
488                    None => {
489                        field_builder.append_null();
490                    }
491                }
492            }
493            None => {
494                field_builder.append_null();
495            }
496        }
497    }};
498}
499
500fn append_object_to_struct_builder(
501    builder: &mut StructBuilder,
502    fields: &Fields,
503    object_opt: Option<Object>,
504) -> DFResult<()> {
505    for (idx, field) in fields.iter().enumerate() {
506        match field.data_type() {
507            DataType::Int32 => {
508                append_object_attr!(
509                    builder,
510                    Int32Builder,
511                    field,
512                    idx,
513                    object_opt,
514                    i32,
515                    just_return
516                );
517            }
518            DataType::Int64 => {
519                append_object_attr!(
520                    builder,
521                    Int64Builder,
522                    field,
523                    idx,
524                    object_opt,
525                    i64,
526                    just_return
527                );
528            }
529            DataType::Float32 => {
530                append_object_attr!(
531                    builder,
532                    Float32Builder,
533                    field,
534                    idx,
535                    object_opt,
536                    f32,
537                    just_return
538                );
539            }
540            DataType::Float64 => {
541                append_object_attr!(
542                    builder,
543                    Float64Builder,
544                    field,
545                    idx,
546                    object_opt,
547                    f64,
548                    just_return
549                );
550            }
551            DataType::Decimal128(_precision, scale) => {
552                append_object_attr!(
553                    builder,
554                    Decimal128Builder,
555                    field,
556                    idx,
557                    object_opt,
558                    String,
559                    |v: String| {
560                        let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
561                            DataFusionError::Execution(format!(
562                                "Failed to parse BigDecimal from {v:?}: {e:?}",
563                            ))
564                        })?;
565                        big_decimal_to_i128(&decimal, Some(*scale as i32))
566                    }
567                );
568            }
569            DataType::Binary => {
570                append_object_attr!(
571                    builder,
572                    BinaryBuilder,
573                    field,
574                    idx,
575                    object_opt,
576                    Vec<u8>,
577                    just_return
578                );
579            }
580            DataType::LargeBinary => {
581                append_object_attr!(
582                    builder,
583                    LargeBinaryBuilder,
584                    field,
585                    idx,
586                    object_opt,
587                    Vec<u8>,
588                    just_return
589                );
590            }
591            DataType::Struct(fields) => {
592                let field_builder =
593                    builder.field_builder::<StructBuilder>(idx).ok_or_else(|| {
594                        DataFusionError::Execution(format!(
595                            "Failed to get struct field builder for {field:?}"
596                        ))
597                    })?;
598                match &object_opt {
599                    Some(object) => {
600                        let field_value =
601                            object.get::<Option<Object>>(field.name()).map_err(|e| {
602                                DataFusionError::Execution(format!(
603                                    "Failed to get object value for {field:?}: {e:?}"
604                                ))
605                            })?;
606                        append_object_to_struct_builder(field_builder, fields, field_value)?;
607                    }
608                    None => {
609                        field_builder.append_null();
610                    }
611                }
612            }
613            _ => {
614                return Err(DataFusionError::NotImplemented(format!(
615                    "Unsupported struct field type {}",
616                    field.data_type(),
617                )));
618            }
619        }
620    }
621
622    match &object_opt {
623        Some(_) => {
624            builder.append(true);
625        }
626        None => {
627            builder.append_null();
628        }
629    }
630    Ok(())
631}