datafusion_remote_table/connection/
oracle.rs

1use crate::connection::{RemoteDbType, just_return, projections_contains};
2use crate::utils::big_decimal_to_i128;
3use crate::{
4    Connection, ConnectionOptions, DFResult, Literalize, OracleConnectionOptions, OracleType, Pool,
5    RemoteField, RemoteSchema, RemoteSchemaRef, RemoteSource, RemoteType,
6};
7use bb8_oracle::OracleConnectionManager;
8use datafusion::arrow::array::{
9    ArrayRef, BinaryBuilder, BooleanBuilder, Date64Builder, Decimal128Builder, Float32Builder,
10    Float64Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
11    LargeStringBuilder, RecordBatch, RecordBatchOptions, StringBuilder, StructBuilder,
12    TimestampNanosecondBuilder, TimestampSecondBuilder, make_builder,
13};
14use datafusion::arrow::datatypes::{DataType, Fields, SchemaRef, TimeUnit};
15use datafusion::common::{DataFusionError, project_schema};
16use datafusion::execution::SendableRecordBatchStream;
17use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
18use futures::StreamExt;
19use log::debug;
20use oracle::sql_type::{Object, OracleType as ColumnType};
21use oracle::{Connector, Row};
22use std::any::Any;
23use std::sync::Arc;
24
25#[derive(Debug)]
26pub struct OraclePool {
27    pool: bb8::Pool<OracleConnectionManager>,
28}
29
30pub(crate) async fn connect_oracle(options: &OracleConnectionOptions) -> DFResult<OraclePool> {
31    let connect_string = format!(
32        "//{}:{}/{}",
33        options.host, options.port, options.service_name
34    );
35    let connector = Connector::new(
36        options.username.clone(),
37        options.password.clone(),
38        connect_string,
39    );
40    let _ = connector
41        .connect()
42        .map_err(|e| DataFusionError::Internal(format!("Failed to connect to oracle: {e:?}")))?;
43    let manager = OracleConnectionManager::from_connector(connector);
44    let pool = bb8::Pool::builder()
45        .max_size(options.pool_max_size as u32)
46        .build(manager)
47        .await
48        .map_err(|e| DataFusionError::Internal(format!("Failed to create oracle pool: {e:?}")))?;
49    Ok(OraclePool { pool })
50}
51
52#[async_trait::async_trait]
53impl Pool for OraclePool {
54    async fn get(&self) -> DFResult<Arc<dyn Connection>> {
55        let conn = self.pool.get_owned().await.map_err(|e| {
56            DataFusionError::Execution(format!("Failed to get oracle connection due to {e:?}"))
57        })?;
58        Ok(Arc::new(OracleConnection { conn }))
59    }
60}
61
62#[derive(Debug)]
63pub struct OracleConnection {
64    conn: bb8::PooledConnection<'static, OracleConnectionManager>,
65}
66
67#[async_trait::async_trait]
68impl Connection for OracleConnection {
69    fn as_any(&self) -> &dyn Any {
70        self
71    }
72
73    async fn infer_schema(&self, source: &RemoteSource) -> DFResult<RemoteSchemaRef> {
74        let sql = RemoteDbType::Oracle.limit_1_query_if_possible(source);
75        let result_set = self.conn.query(&sql, &[]).map_err(|e| {
76            DataFusionError::Plan(format!("Failed to execute query {sql} on oracle: {e:?}"))
77        })?;
78        let remote_schema = Arc::new(build_remote_schema(&result_set)?);
79        Ok(remote_schema)
80    }
81
82    async fn query(
83        &self,
84        conn_options: &ConnectionOptions,
85        source: &RemoteSource,
86        table_schema: SchemaRef,
87        projection: Option<&Vec<usize>>,
88        unparsed_filters: &[String],
89        limit: Option<usize>,
90    ) -> DFResult<SendableRecordBatchStream> {
91        let projected_schema = project_schema(&table_schema, projection)?;
92
93        let sql = RemoteDbType::Oracle.rewrite_query(source, unparsed_filters, limit);
94        debug!("[remote-table] executing oracle query: {sql}");
95
96        let projection = projection.cloned();
97        let chunk_size = conn_options.stream_chunk_size();
98        let result_set = self.conn.query(&sql, &[]).map_err(|e| {
99            DataFusionError::Execution(format!("Failed to execute query on oracle: {e:?}"))
100        })?;
101        let stream = futures::stream::iter(result_set).chunks(chunk_size).boxed();
102
103        let stream = stream.map(move |rows| {
104            let rows: Vec<Row> = rows
105                .into_iter()
106                .collect::<Result<Vec<_>, _>>()
107                .map_err(|e| {
108                    DataFusionError::Execution(format!(
109                        "Failed to collect rows from oracle due to {e}",
110                    ))
111                })?;
112            rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
113        });
114
115        Ok(Box::pin(RecordBatchStreamAdapter::new(
116            projected_schema,
117            stream,
118        )))
119    }
120
121    async fn insert(
122        &self,
123        _conn_options: &ConnectionOptions,
124        _literalizer: Arc<dyn Literalize>,
125        _table: &[String],
126        _remote_schema: RemoteSchemaRef,
127        _input: SendableRecordBatchStream,
128    ) -> DFResult<usize> {
129        Err(DataFusionError::Execution(
130            "Insert operation is not supported for oracle".to_string(),
131        ))
132    }
133}
134
135fn oracle_type_to_remote_type(oracle_type: &ColumnType) -> DFResult<OracleType> {
136    match oracle_type {
137        ColumnType::Number(precision, scale) => {
138            // TODO need more investigation on the precision and scale
139            let precision = if *precision == 0 { 38 } else { *precision };
140            let scale = if *scale == -127 { 0 } else { *scale };
141            Ok(OracleType::Number(precision, scale))
142        }
143        ColumnType::BinaryFloat => Ok(OracleType::BinaryFloat),
144        ColumnType::BinaryDouble => Ok(OracleType::BinaryDouble),
145        ColumnType::Float(precision) => Ok(OracleType::Float(*precision)),
146        ColumnType::Varchar2(size) => Ok(OracleType::Varchar2(*size)),
147        ColumnType::NVarchar2(size) => Ok(OracleType::NVarchar2(*size)),
148        ColumnType::Char(size) => Ok(OracleType::Char(*size)),
149        ColumnType::NChar(size) => Ok(OracleType::NChar(*size)),
150        ColumnType::Long => Ok(OracleType::Long),
151        ColumnType::CLOB => Ok(OracleType::Clob),
152        ColumnType::NCLOB => Ok(OracleType::NClob),
153        ColumnType::Raw(size) => Ok(OracleType::Raw(*size)),
154        ColumnType::LongRaw => Ok(OracleType::LongRaw),
155        ColumnType::BLOB => Ok(OracleType::Blob),
156        ColumnType::Date => Ok(OracleType::Date),
157        ColumnType::Timestamp(_) => Ok(OracleType::Timestamp),
158        ColumnType::Boolean => Ok(OracleType::Boolean),
159        ColumnType::Object(_) => Ok(OracleType::SdeGeometry),
160        _ => Err(DataFusionError::NotImplemented(format!(
161            "Unsupported oracle type: {oracle_type:?}",
162        ))),
163    }
164}
165
166fn build_remote_schema(result_set: &oracle::ResultSet<Row>) -> DFResult<RemoteSchema> {
167    let mut remote_fields = vec![];
168    for col in result_set.column_info() {
169        let remote_type = RemoteType::Oracle(oracle_type_to_remote_type(col.oracle_type())?);
170        remote_fields.push(RemoteField::new(col.name(), remote_type, col.nullable()));
171    }
172    Ok(RemoteSchema::new(remote_fields))
173}
174
175macro_rules! handle_primitive_type {
176    ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
177        let builder = $builder
178            .as_any_mut()
179            .downcast_mut::<$builder_ty>()
180            .unwrap_or_else(|| {
181                panic!(
182                    "Failed to downcast builder to {} for {:?} and {:?}",
183                    stringify!($builder_ty),
184                    $field,
185                    $col
186                )
187            });
188        let v = $row.get::<usize, Option<$value_ty>>($index).map_err(|e| {
189            DataFusionError::Execution(format!(
190                "Failed to get {} value for {:?} and {:?}: {e:?}",
191                stringify!($value_ty),
192                $field,
193                $col
194            ))
195        })?;
196
197        match v {
198            Some(v) => builder.append_value($convert(v)?),
199            None => builder.append_null(),
200        }
201    }};
202}
203
204fn rows_to_batch(
205    rows: &[Row],
206    table_schema: &SchemaRef,
207    projection: Option<&Vec<usize>>,
208) -> DFResult<RecordBatch> {
209    let projected_schema = project_schema(table_schema, projection)?;
210    let mut array_builders = vec![];
211    for field in table_schema.fields() {
212        let builder = make_builder(field.data_type(), rows.len());
213        array_builders.push(builder);
214    }
215
216    for row in rows {
217        for (idx, field) in table_schema.fields.iter().enumerate() {
218            if !projections_contains(projection, idx) {
219                continue;
220            }
221            let builder = &mut array_builders[idx];
222            let col = row.column_info().get(idx);
223            match field.data_type() {
224                DataType::Int16 => {
225                    handle_primitive_type!(
226                        builder,
227                        field,
228                        col,
229                        Int16Builder,
230                        i16,
231                        row,
232                        idx,
233                        just_return
234                    );
235                }
236                DataType::Int32 => {
237                    handle_primitive_type!(
238                        builder,
239                        field,
240                        col,
241                        Int32Builder,
242                        i32,
243                        row,
244                        idx,
245                        just_return
246                    );
247                }
248                DataType::Int64 => {
249                    handle_primitive_type!(
250                        builder,
251                        field,
252                        col,
253                        Int64Builder,
254                        i64,
255                        row,
256                        idx,
257                        just_return
258                    );
259                }
260                DataType::Float32 => {
261                    handle_primitive_type!(
262                        builder,
263                        field,
264                        col,
265                        Float32Builder,
266                        f32,
267                        row,
268                        idx,
269                        just_return
270                    );
271                }
272                DataType::Float64 => {
273                    handle_primitive_type!(
274                        builder,
275                        field,
276                        col,
277                        Float64Builder,
278                        f64,
279                        row,
280                        idx,
281                        just_return
282                    );
283                }
284                DataType::Utf8 => {
285                    handle_primitive_type!(
286                        builder,
287                        field,
288                        col,
289                        StringBuilder,
290                        String,
291                        row,
292                        idx,
293                        just_return
294                    );
295                }
296                DataType::LargeUtf8 => {
297                    handle_primitive_type!(
298                        builder,
299                        field,
300                        col,
301                        LargeStringBuilder,
302                        String,
303                        row,
304                        idx,
305                        just_return
306                    );
307                }
308                DataType::Decimal128(_precision, scale) => {
309                    handle_primitive_type!(
310                        builder,
311                        field,
312                        col,
313                        Decimal128Builder,
314                        String,
315                        row,
316                        idx,
317                        |v: String| {
318                            let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
319                                DataFusionError::Execution(format!(
320                                    "Failed to parse BigDecimal from {v:?}: {e:?}",
321                                ))
322                            })?;
323                            big_decimal_to_i128(&decimal, Some(*scale as i32))
324                        }
325                    );
326                }
327                DataType::Timestamp(TimeUnit::Second, None) => {
328                    handle_primitive_type!(
329                        builder,
330                        field,
331                        col,
332                        TimestampSecondBuilder,
333                        chrono::NaiveDateTime,
334                        row,
335                        idx,
336                        |v: chrono::NaiveDateTime| {
337                            let t = v.and_utc().timestamp();
338                            Ok::<_, DataFusionError>(t)
339                        }
340                    );
341                }
342                DataType::Timestamp(TimeUnit::Nanosecond, None) => {
343                    handle_primitive_type!(
344                        builder,
345                        field,
346                        col,
347                        TimestampNanosecondBuilder,
348                        chrono::NaiveDateTime,
349                        row,
350                        idx,
351                        |v: chrono::NaiveDateTime| {
352                            v.and_utc().timestamp_nanos_opt().ok_or_else(|| {
353                                DataFusionError::Execution(format!(
354                                    "Failed to convert chrono::NaiveDateTime {v} to nanos timestamp"
355                                ))
356                            })
357                        }
358                    );
359                }
360                DataType::Date64 => {
361                    handle_primitive_type!(
362                        builder,
363                        field,
364                        col,
365                        Date64Builder,
366                        chrono::NaiveDateTime,
367                        row,
368                        idx,
369                        |v: chrono::NaiveDateTime| {
370                            Ok::<_, DataFusionError>(v.and_utc().timestamp_millis())
371                        }
372                    );
373                }
374                DataType::Boolean => {
375                    handle_primitive_type!(
376                        builder,
377                        field,
378                        col,
379                        BooleanBuilder,
380                        bool,
381                        row,
382                        idx,
383                        just_return
384                    );
385                }
386                DataType::Binary => {
387                    handle_primitive_type!(
388                        builder,
389                        field,
390                        col,
391                        BinaryBuilder,
392                        Vec<u8>,
393                        row,
394                        idx,
395                        just_return
396                    );
397                }
398                DataType::LargeBinary => {
399                    handle_primitive_type!(
400                        builder,
401                        field,
402                        col,
403                        LargeBinaryBuilder,
404                        Vec<u8>,
405                        row,
406                        idx,
407                        just_return
408                    );
409                }
410                DataType::Struct(fields) => {
411                    let builder = builder
412                        .as_any_mut()
413                        .downcast_mut::<StructBuilder>()
414                        .unwrap_or_else(|| {
415                            panic!("Failed to downcast builder to StructBuilder for {field:?} and {col:?}")
416                        });
417                    let object = row.get::<usize, Option<Object>>(idx).map_err(|e| {
418                        DataFusionError::Execution(format!(
419                            "Failed to get object for {field:?} and {col:?}: {e:?}"
420                        ))
421                    })?;
422                    append_object_to_struct_builder(builder, fields, object)?;
423                }
424                _ => {
425                    return Err(DataFusionError::NotImplemented(format!(
426                        "Unsupported data type {:?} for col: {:?}",
427                        field.data_type(),
428                        col
429                    )));
430                }
431            }
432        }
433    }
434
435    let projected_columns = array_builders
436        .into_iter()
437        .enumerate()
438        .filter(|(idx, _)| projections_contains(projection, *idx))
439        .map(|(_, mut builder)| builder.finish())
440        .collect::<Vec<ArrayRef>>();
441    let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
442    Ok(RecordBatch::try_new_with_options(
443        projected_schema,
444        projected_columns,
445        &options,
446    )?)
447}
448
449macro_rules! append_object_attr {
450    ($struct_builder:expr, $field_builder_type:ty, $field:expr, $field_idx:expr, $object_opt:expr, $field_value_ty:ty, $convert:expr) => {{
451        let field_builder = $struct_builder
452            .field_builder::<$field_builder_type>($field_idx)
453            .ok_or_else(|| {
454                DataFusionError::Execution(format!(
455                    "Failed to get {} field builder for {:?}",
456                    stringify!($field_builder_type),
457                    $field,
458                ))
459            })?;
460        match &$object_opt {
461            Some(object) => {
462                let field_name = $field.name();
463                let field_value_opt =
464                    object
465                        .get::<Option<$field_value_ty>>(field_name)
466                        .map_err(|e| {
467                            DataFusionError::Execution(format!(
468                                "Failed to get {} field value for {:?}: {e:?}",
469                                stringify!($field_value_ty),
470                                $field,
471                            ))
472                        })?;
473                match field_value_opt {
474                    Some(field_value) => {
475                        field_builder.append_value($convert(field_value)?);
476                    }
477                    None => {
478                        field_builder.append_null();
479                    }
480                }
481            }
482            None => {
483                field_builder.append_null();
484            }
485        }
486    }};
487}
488
489fn append_object_to_struct_builder(
490    builder: &mut StructBuilder,
491    fields: &Fields,
492    object_opt: Option<Object>,
493) -> DFResult<()> {
494    for (idx, field) in fields.iter().enumerate() {
495        match field.data_type() {
496            DataType::Int32 => {
497                append_object_attr!(
498                    builder,
499                    Int32Builder,
500                    field,
501                    idx,
502                    object_opt,
503                    i32,
504                    just_return
505                );
506            }
507            DataType::Int64 => {
508                append_object_attr!(
509                    builder,
510                    Int64Builder,
511                    field,
512                    idx,
513                    object_opt,
514                    i64,
515                    just_return
516                );
517            }
518            DataType::Float32 => {
519                append_object_attr!(
520                    builder,
521                    Float32Builder,
522                    field,
523                    idx,
524                    object_opt,
525                    f32,
526                    just_return
527                );
528            }
529            DataType::Float64 => {
530                append_object_attr!(
531                    builder,
532                    Float64Builder,
533                    field,
534                    idx,
535                    object_opt,
536                    f64,
537                    just_return
538                );
539            }
540            DataType::Decimal128(_precision, scale) => {
541                append_object_attr!(
542                    builder,
543                    Decimal128Builder,
544                    field,
545                    idx,
546                    object_opt,
547                    String,
548                    |v: String| {
549                        let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
550                            DataFusionError::Execution(format!(
551                                "Failed to parse BigDecimal from {v:?}: {e:?}",
552                            ))
553                        })?;
554                        big_decimal_to_i128(&decimal, Some(*scale as i32))
555                    }
556                );
557            }
558            DataType::Binary => {
559                append_object_attr!(
560                    builder,
561                    BinaryBuilder,
562                    field,
563                    idx,
564                    object_opt,
565                    Vec<u8>,
566                    just_return
567                );
568            }
569            DataType::LargeBinary => {
570                append_object_attr!(
571                    builder,
572                    LargeBinaryBuilder,
573                    field,
574                    idx,
575                    object_opt,
576                    Vec<u8>,
577                    just_return
578                );
579            }
580            DataType::Struct(fields) => {
581                let field_builder =
582                    builder.field_builder::<StructBuilder>(idx).ok_or_else(|| {
583                        DataFusionError::Execution(format!(
584                            "Failed to get struct field builder for {field:?}"
585                        ))
586                    })?;
587                match &object_opt {
588                    Some(object) => {
589                        let field_value =
590                            object.get::<Option<Object>>(field.name()).map_err(|e| {
591                                DataFusionError::Execution(format!(
592                                    "Failed to get object value for {field:?}: {e:?}"
593                                ))
594                            })?;
595                        append_object_to_struct_builder(field_builder, fields, field_value)?;
596                    }
597                    None => {
598                        field_builder.append_null();
599                    }
600                }
601            }
602            _ => {
603                return Err(DataFusionError::NotImplemented(format!(
604                    "Unsupported struct field type {}",
605                    field.data_type(),
606                )));
607            }
608        }
609    }
610
611    match &object_opt {
612        Some(_) => {
613            builder.append(true);
614        }
615        None => {
616            builder.append_null();
617        }
618    }
619    Ok(())
620}