datafusion_remote_table/connection/
oracle.rs

1use crate::connection::{RemoteDbType, just_return, projections_contains};
2use crate::utils::big_decimal_to_i128;
3use crate::{
4    Connection, ConnectionOptions, DFResult, Literalize, OracleType, Pool, RemoteField,
5    RemoteSchema, RemoteSchemaRef, RemoteSource, RemoteType,
6};
7use bb8_oracle::OracleConnectionManager;
8use datafusion::arrow::array::{
9    ArrayRef, BinaryBuilder, BooleanBuilder, Date64Builder, Decimal128Builder, Float32Builder,
10    Float64Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
11    LargeStringBuilder, RecordBatch, RecordBatchOptions, StringBuilder, StructBuilder,
12    TimestampNanosecondBuilder, TimestampSecondBuilder, make_builder,
13};
14use datafusion::arrow::datatypes::{DataType, Fields, SchemaRef, TimeUnit};
15use datafusion::common::{DataFusionError, project_schema};
16use datafusion::execution::SendableRecordBatchStream;
17use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
18use derive_getters::Getters;
19use derive_with::With;
20use futures::StreamExt;
21use log::debug;
22use oracle::sql_type::{Object, OracleType as ColumnType};
23use oracle::{Connector, Row};
24use std::any::Any;
25use std::sync::Arc;
26
27#[derive(Debug, Clone, With, Getters)]
28pub struct OracleConnectionOptions {
29    pub(crate) host: String,
30    pub(crate) port: u16,
31    pub(crate) username: String,
32    pub(crate) password: String,
33    pub(crate) service_name: String,
34    pub(crate) pool_max_size: usize,
35    pub(crate) stream_chunk_size: usize,
36}
37
38impl OracleConnectionOptions {
39    pub fn new(
40        host: impl Into<String>,
41        port: u16,
42        username: impl Into<String>,
43        password: impl Into<String>,
44        service_name: impl Into<String>,
45    ) -> Self {
46        Self {
47            host: host.into(),
48            port,
49            username: username.into(),
50            password: password.into(),
51            service_name: service_name.into(),
52            pool_max_size: 10,
53            stream_chunk_size: 2048,
54        }
55    }
56}
57
58impl From<OracleConnectionOptions> for ConnectionOptions {
59    fn from(options: OracleConnectionOptions) -> Self {
60        ConnectionOptions::Oracle(options)
61    }
62}
63
64#[derive(Debug)]
65pub struct OraclePool {
66    pool: bb8::Pool<OracleConnectionManager>,
67}
68
69pub(crate) async fn connect_oracle(options: &OracleConnectionOptions) -> DFResult<OraclePool> {
70    let connect_string = format!(
71        "//{}:{}/{}",
72        options.host, options.port, options.service_name
73    );
74    let connector = Connector::new(
75        options.username.clone(),
76        options.password.clone(),
77        connect_string,
78    );
79    let _ = connector
80        .connect()
81        .map_err(|e| DataFusionError::Internal(format!("Failed to connect to oracle: {e:?}")))?;
82    let manager = OracleConnectionManager::from_connector(connector);
83    let pool = bb8::Pool::builder()
84        .max_size(options.pool_max_size as u32)
85        .build(manager)
86        .await
87        .map_err(|e| DataFusionError::Internal(format!("Failed to create oracle pool: {e:?}")))?;
88    Ok(OraclePool { pool })
89}
90
91#[async_trait::async_trait]
92impl Pool for OraclePool {
93    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
94        let conn = self.pool.get_owned().await.map_err(|e| {
95            DataFusionError::Execution(format!("Failed to get oracle connection due to {e:?}"))
96        })?;
97        Ok(Arc::new(OracleConnection { conn }))
98    }
99}
100
101#[derive(Debug)]
102pub struct OracleConnection {
103    conn: bb8::PooledConnection<'static, OracleConnectionManager>,
104}
105
106#[async_trait::async_trait]
107impl Connection for OracleConnection {
108    fn as_any(&self) -> &dyn Any {
109        self
110    }
111
112    async fn infer_schema(&self, source: &RemoteSource) -> DFResult<RemoteSchemaRef> {
113        let sql = RemoteDbType::Oracle.limit_1_query_if_possible(source);
114        let result_set = self.conn.query(&sql, &[]).map_err(|e| {
115            DataFusionError::Execution(format!("Failed to execute query {sql} on oracle: {e:?}"))
116        })?;
117        let remote_schema = Arc::new(build_remote_schema(&result_set)?);
118        Ok(remote_schema)
119    }
120
121    async fn query(
122        &self,
123        conn_options: &ConnectionOptions,
124        source: &RemoteSource,
125        table_schema: SchemaRef,
126        projection: Option<&Vec<usize>>,
127        unparsed_filters: &[String],
128        limit: Option<usize>,
129    ) -> DFResult<SendableRecordBatchStream> {
130        let projected_schema = project_schema(&table_schema, projection)?;
131
132        let sql = RemoteDbType::Oracle.rewrite_query(source, unparsed_filters, limit);
133        debug!("[remote-table] executing oracle query: {sql}");
134
135        let projection = projection.cloned();
136        let chunk_size = conn_options.stream_chunk_size();
137        let result_set = self.conn.query(&sql, &[]).map_err(|e| {
138            DataFusionError::Execution(format!("Failed to execute query on oracle: {e:?}"))
139        })?;
140        let stream = futures::stream::iter(result_set).chunks(chunk_size).boxed();
141
142        let stream = stream.map(move |rows| {
143            let rows: Vec<Row> = rows
144                .into_iter()
145                .collect::<Result<Vec<_>, _>>()
146                .map_err(|e| {
147                    DataFusionError::Execution(format!(
148                        "Failed to collect rows from oracle due to {e}",
149                    ))
150                })?;
151            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
152        });
153
154        Ok(Box::pin(RecordBatchStreamAdapter::new(
155            projected_schema,
156            stream,
157        )))
158    }
159
160    async fn insert(
161        &self,
162        _conn_options: &ConnectionOptions,
163        _literalizer: Arc<dyn Literalize>,
164        _table: &[String],
165        _remote_schema: RemoteSchemaRef,
166        _input: SendableRecordBatchStream,
167    ) -> DFResult<usize> {
168        Err(DataFusionError::Execution(
169            "Insert operation is not supported for oracle".to_string(),
170        ))
171    }
172}
173
174fn oracle_type_to_remote_type(oracle_type: &ColumnType) -> DFResult<OracleType> {
175    match oracle_type {
176        ColumnType::Number(precision, scale) => {
177            // TODO need more investigation on the precision and scale
178            let precision = if *precision == 0 { 38 } else { *precision };
179            let scale = if *scale == -127 { 0 } else { *scale };
180            Ok(OracleType::Number(precision, scale))
181        }
182        ColumnType::BinaryFloat => Ok(OracleType::BinaryFloat),
183        ColumnType::BinaryDouble => Ok(OracleType::BinaryDouble),
184        ColumnType::Float(precision) => Ok(OracleType::Float(*precision)),
185        ColumnType::Varchar2(size) => Ok(OracleType::Varchar2(*size)),
186        ColumnType::NVarchar2(size) => Ok(OracleType::NVarchar2(*size)),
187        ColumnType::Char(size) => Ok(OracleType::Char(*size)),
188        ColumnType::NChar(size) => Ok(OracleType::NChar(*size)),
189        ColumnType::Long => Ok(OracleType::Long),
190        ColumnType::CLOB => Ok(OracleType::Clob),
191        ColumnType::NCLOB => Ok(OracleType::NClob),
192        ColumnType::Raw(size) => Ok(OracleType::Raw(*size)),
193        ColumnType::LongRaw => Ok(OracleType::LongRaw),
194        ColumnType::BLOB => Ok(OracleType::Blob),
195        ColumnType::Date => Ok(OracleType::Date),
196        ColumnType::Timestamp(_) => Ok(OracleType::Timestamp),
197        ColumnType::Boolean => Ok(OracleType::Boolean),
198        ColumnType::Object(_) => Ok(OracleType::SdeGeometry),
199        _ => Err(DataFusionError::NotImplemented(format!(
200            "Unsupported oracle type: {oracle_type:?}",
201        ))),
202    }
203}
204
205fn build_remote_schema(result_set: &oracle::ResultSet<Row>) -> DFResult<RemoteSchema> {
206    let mut remote_fields = vec![];
207    for col in result_set.column_info() {
208        let remote_type = RemoteType::Oracle(oracle_type_to_remote_type(col.oracle_type())?);
209        remote_fields.push(RemoteField::new(col.name(), remote_type, col.nullable()));
210    }
211    Ok(RemoteSchema::new(remote_fields))
212}
213
214macro_rules! handle_primitive_type {
215    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
216        let builder = $builder
217            .as_any_mut()
218            .downcast_mut::<$builder_ty>()
219            .unwrap_or_else(|| {
220                panic!(
221                    "Failed to downcast builder to {} for {:?} and {:?}",
222                    stringify!($builder_ty),
223                    $field,
224                    $col
225                )
226            });
227        let v = $row.get::<usize, Option<$value_ty>>($index).map_err(|e| {
228            DataFusionError::Execution(format!(
229                "Failed to get {} value for {:?} and {:?}: {e:?}",
230                stringify!($value_ty),
231                $field,
232                $col
233            ))
234        })?;
235
236        match v {
237            Some(v) => builder.append_value($convert(v)?),
238            None => builder.append_null(),
239        }
240    }};
241}
242
243fn rows_to_batch(
244    rows: &[Row],
245    table_schema: &SchemaRef,
246    projection: Option<&Vec<usize>>,
247) -> DFResult<RecordBatch> {
248    let projected_schema = project_schema(table_schema, projection)?;
249    let mut array_builders = vec![];
250    for field in table_schema.fields() {
251        let builder = make_builder(field.data_type(), rows.len());
252        array_builders.push(builder);
253    }
254
255    for row in rows {
256        for (idx, field) in table_schema.fields.iter().enumerate() {
257            if !projections_contains(projection, idx) {
258                continue;
259            }
260            let builder = &mut array_builders[idx];
261            let col = row.column_info().get(idx);
262            match field.data_type() {
263                DataType::Int16 => {
264                    handle_primitive_type!(
265                        builder,
266                        field,
267                        col,
268                        Int16Builder,
269                        i16,
270                        row,
271                        idx,
272                        just_return
273                    );
274                }
275                DataType::Int32 => {
276                    handle_primitive_type!(
277                        builder,
278                        field,
279                        col,
280                        Int32Builder,
281                        i32,
282                        row,
283                        idx,
284                        just_return
285                    );
286                }
287                DataType::Int64 => {
288                    handle_primitive_type!(
289                        builder,
290                        field,
291                        col,
292                        Int64Builder,
293                        i64,
294                        row,
295                        idx,
296                        just_return
297                    );
298                }
299                DataType::Float32 => {
300                    handle_primitive_type!(
301                        builder,
302                        field,
303                        col,
304                        Float32Builder,
305                        f32,
306                        row,
307                        idx,
308                        just_return
309                    );
310                }
311                DataType::Float64 => {
312                    handle_primitive_type!(
313                        builder,
314                        field,
315                        col,
316                        Float64Builder,
317                        f64,
318                        row,
319                        idx,
320                        just_return
321                    );
322                }
323                DataType::Utf8 => {
324                    handle_primitive_type!(
325                        builder,
326                        field,
327                        col,
328                        StringBuilder,
329                        String,
330                        row,
331                        idx,
332                        just_return
333                    );
334                }
335                DataType::LargeUtf8 => {
336                    handle_primitive_type!(
337                        builder,
338                        field,
339                        col,
340                        LargeStringBuilder,
341                        String,
342                        row,
343                        idx,
344                        just_return
345                    );
346                }
347                DataType::Decimal128(_precision, scale) => {
348                    handle_primitive_type!(
349                        builder,
350                        field,
351                        col,
352                        Decimal128Builder,
353                        String,
354                        row,
355                        idx,
356                        |v: String| {
357                            let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
358                                DataFusionError::Execution(format!(
359                                    "Failed to parse BigDecimal from {v:?}: {e:?}",
360                                ))
361                            })?;
362                            big_decimal_to_i128(&decimal, Some(*scale as i32))
363                        }
364                    );
365                }
366                DataType::Timestamp(TimeUnit::Second, None) => {
367                    handle_primitive_type!(
368                        builder,
369                        field,
370                        col,
371                        TimestampSecondBuilder,
372                        chrono::NaiveDateTime,
373                        row,
374                        idx,
375                        |v: chrono::NaiveDateTime| {
376                            let t = v.and_utc().timestamp();
377                            Ok::<_, DataFusionError>(t)
378                        }
379                    );
380                }
381                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
382                    handle_primitive_type!(
383                        builder,
384                        field,
385                        col,
386                        TimestampNanosecondBuilder,
387                        chrono::NaiveDateTime,
388                        row,
389                        idx,
390                        |v: chrono::NaiveDateTime| {
391                            v.and_utc().timestamp_nanos_opt().ok_or_else(|| {
392                                DataFusionError::Execution(format!(
393                                    "Failed to convert chrono::NaiveDateTime {v} to nanos timestamp"
394                                ))
395                            })
396                        }
397                    );
398                }
399                DataType::Date64 => {
400                    handle_primitive_type!(
401                        builder,
402                        field,
403                        col,
404                        Date64Builder,
405                        chrono::NaiveDateTime,
406                        row,
407                        idx,
408                        |v: chrono::NaiveDateTime| {
409                            Ok::<_, DataFusionError>(v.and_utc().timestamp_millis())
410                        }
411                    );
412                }
413                DataType::Boolean => {
414                    handle_primitive_type!(
415                        builder,
416                        field,
417                        col,
418                        BooleanBuilder,
419                        bool,
420                        row,
421                        idx,
422                        just_return
423                    );
424                }
425                DataType::Binary => {
426                    handle_primitive_type!(
427                        builder,
428                        field,
429                        col,
430                        BinaryBuilder,
431                        Vec<u8>,
432                        row,
433                        idx,
434                        just_return
435                    );
436                }
437                DataType::LargeBinary => {
438                    handle_primitive_type!(
439                        builder,
440                        field,
441                        col,
442                        LargeBinaryBuilder,
443                        Vec<u8>,
444                        row,
445                        idx,
446                        just_return
447                    );
448                }
449                DataType::Struct(fields) => {
450                    let builder = builder
451                        .as_any_mut()
452                        .downcast_mut::<StructBuilder>()
453                        .unwrap_or_else(|| {
454                            panic!("Failed to downcast builder to StructBuilder for {field:?} and {col:?}")
455                        });
456                    let object = row.get::<usize, Option<Object>>(idx).map_err(|e| {
457                        DataFusionError::Execution(format!(
458                            "Failed to get object for {field:?} and {col:?}: {e:?}"
459                        ))
460                    })?;
461                    append_object_to_struct_builder(builder, fields, object)?;
462                }
463                _ => {
464                    return Err(DataFusionError::NotImplemented(format!(
465                        "Unsupported data type {:?} for col: {:?}",
466                        field.data_type(),
467                        col
468                    )));
469                }
470            }
471        }
472    }
473
474    let projected_columns = array_builders
475        .into_iter()
476        .enumerate()
477        .filter(|(idx, _)| projections_contains(projection, *idx))
478        .map(|(_, mut builder)| builder.finish())
479        .collect::<Vec<ArrayRef>>();
480    let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
481    Ok(RecordBatch::try_new_with_options(
482        projected_schema,
483        projected_columns,
484        &options,
485    )?)
486}
487
488macro_rules! append_object_attr {
489    ($struct_builder:expr, $field_builder_type:ty, $field:expr, $field_idx:expr, $object_opt:expr, $field_value_ty:ty, $convert:expr) => {{
490        let field_builder = $struct_builder
491            .field_builder::<$field_builder_type>($field_idx)
492            .ok_or_else(|| {
493                DataFusionError::Execution(format!(
494                    "Failed to get {} field builder for {:?}",
495                    stringify!($field_builder_type),
496                    $field,
497                ))
498            })?;
499        match &$object_opt {
500            Some(object) => {
501                let field_name = $field.name();
502                let field_value_opt =
503                    object
504                        .get::<Option<$field_value_ty>>(field_name)
505                        .map_err(|e| {
506                            DataFusionError::Execution(format!(
507                                "Failed to get {} field value for {:?}: {e:?}",
508                                stringify!($field_value_ty),
509                                $field,
510                            ))
511                        })?;
512                match field_value_opt {
513                    Some(field_value) => {
514                        field_builder.append_value($convert(field_value)?);
515                    }
516                    None => {
517                        field_builder.append_null();
518                    }
519                }
520            }
521            None => {
522                field_builder.append_null();
523            }
524        }
525    }};
526}
527
528fn append_object_to_struct_builder(
529    builder: &mut StructBuilder,
530    fields: &Fields,
531    object_opt: Option<Object>,
532) -> DFResult<()> {
533    for (idx, field) in fields.iter().enumerate() {
534        match field.data_type() {
535            DataType::Int32 => {
536                append_object_attr!(
537                    builder,
538                    Int32Builder,
539                    field,
540                    idx,
541                    object_opt,
542                    i32,
543                    just_return
544                );
545            }
546            DataType::Int64 => {
547                append_object_attr!(
548                    builder,
549                    Int64Builder,
550                    field,
551                    idx,
552                    object_opt,
553                    i64,
554                    just_return
555                );
556            }
557            DataType::Float32 => {
558                append_object_attr!(
559                    builder,
560                    Float32Builder,
561                    field,
562                    idx,
563                    object_opt,
564                    f32,
565                    just_return
566                );
567            }
568            DataType::Float64 => {
569                append_object_attr!(
570                    builder,
571                    Float64Builder,
572                    field,
573                    idx,
574                    object_opt,
575                    f64,
576                    just_return
577                );
578            }
579            DataType::Decimal128(_precision, scale) => {
580                append_object_attr!(
581                    builder,
582                    Decimal128Builder,
583                    field,
584                    idx,
585                    object_opt,
586                    String,
587                    |v: String| {
588                        let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
589                            DataFusionError::Execution(format!(
590                                "Failed to parse BigDecimal from {v:?}: {e:?}",
591                            ))
592                        })?;
593                        big_decimal_to_i128(&decimal, Some(*scale as i32))
594                    }
595                );
596            }
597            DataType::Binary => {
598                append_object_attr!(
599                    builder,
600                    BinaryBuilder,
601                    field,
602                    idx,
603                    object_opt,
604                    Vec<u8>,
605                    just_return
606                );
607            }
608            DataType::LargeBinary => {
609                append_object_attr!(
610                    builder,
611                    LargeBinaryBuilder,
612                    field,
613                    idx,
614                    object_opt,
615                    Vec<u8>,
616                    just_return
617                );
618            }
619            DataType::Struct(fields) => {
620                let field_builder =
621                    builder.field_builder::<StructBuilder>(idx).ok_or_else(|| {
622                        DataFusionError::Execution(format!(
623                            "Failed to get struct field builder for {field:?}"
624                        ))
625                    })?;
626                match &object_opt {
627                    Some(object) => {
628                        let field_value =
629                            object.get::<Option<Object>>(field.name()).map_err(|e| {
630                                DataFusionError::Execution(format!(
631                                    "Failed to get object value for {field:?}: {e:?}"
632                                ))
633                            })?;
634                        append_object_to_struct_builder(field_builder, fields, field_value)?;
635                    }
636                    None => {
637                        field_builder.append_null();
638                    }
639                }
640            }
641            _ => {
642                return Err(DataFusionError::NotImplemented(format!(
643                    "Unsupported struct field type {}",
644                    field.data_type(),
645                )));
646            }
647        }
648    }
649
650    match &object_opt {
651        Some(_) => {
652            builder.append(true);
653        }
654        None => {
655            builder.append_null();
656        }
657    }
658    Ok(())
659}