1use crate::connection::{RemoteDbType, just_return, projections_contains};
2use crate::utils::big_decimal_to_i128;
3use crate::{
4 Connection, ConnectionOptions, DFResult, Literalize, OracleType, Pool, RemoteField,
5 RemoteSchema, RemoteSchemaRef, RemoteSource, RemoteType,
6};
7use bb8_oracle::OracleConnectionManager;
8use datafusion::arrow::array::{
9 ArrayRef, BinaryBuilder, BooleanBuilder, Date64Builder, Decimal128Builder, Float32Builder,
10 Float64Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
11 LargeStringBuilder, RecordBatch, RecordBatchOptions, StringBuilder, StructBuilder,
12 TimestampNanosecondBuilder, TimestampSecondBuilder, make_builder,
13};
14use datafusion::arrow::datatypes::{DataType, Fields, SchemaRef, TimeUnit};
15use datafusion::common::{DataFusionError, project_schema};
16use datafusion::execution::SendableRecordBatchStream;
17use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
18use derive_getters::Getters;
19use derive_with::With;
20use futures::StreamExt;
21use log::debug;
22use oracle::sql_type::{Object, OracleType as ColumnType};
23use oracle::{Connector, Row};
24use std::any::Any;
25use std::sync::Arc;
26
27#[derive(Debug, Clone, With, Getters)]
28pub struct OracleConnectionOptions {
29 pub(crate) host: String,
30 pub(crate) port: u16,
31 pub(crate) username: String,
32 pub(crate) password: String,
33 pub(crate) service_name: String,
34 pub(crate) pool_max_size: usize,
35 pub(crate) stream_chunk_size: usize,
36}
37
38impl OracleConnectionOptions {
39 pub fn new(
40 host: impl Into<String>,
41 port: u16,
42 username: impl Into<String>,
43 password: impl Into<String>,
44 service_name: impl Into<String>,
45 ) -> Self {
46 Self {
47 host: host.into(),
48 port,
49 username: username.into(),
50 password: password.into(),
51 service_name: service_name.into(),
52 pool_max_size: 10,
53 stream_chunk_size: 2048,
54 }
55 }
56}
57
58impl From<OracleConnectionOptions> for ConnectionOptions {
59 fn from(options: OracleConnectionOptions) -> Self {
60 ConnectionOptions::Oracle(options)
61 }
62}
63
64#[derive(Debug)]
65pub struct OraclePool {
66 pool: bb8::Pool<OracleConnectionManager>,
67}
68
69pub(crate) async fn connect_oracle(options: &OracleConnectionOptions) -> DFResult<OraclePool> {
70 let connect_string = format!(
71 "//{}:{}/{}",
72 options.host, options.port, options.service_name
73 );
74 let connector = Connector::new(
75 options.username.clone(),
76 options.password.clone(),
77 connect_string,
78 );
79 let _ = connector
80 .connect()
81 .map_err(|e| DataFusionError::Internal(format!("Failed to connect to oracle: {e:?}")))?;
82 let manager = OracleConnectionManager::from_connector(connector);
83 let pool = bb8::Pool::builder()
84 .max_size(options.pool_max_size as u32)
85 .build(manager)
86 .await
87 .map_err(|e| DataFusionError::Internal(format!("Failed to create oracle pool: {e:?}")))?;
88 Ok(OraclePool { pool })
89}
90
91#[async_trait::async_trait]
92impl Pool for OraclePool {
93 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
94 let conn = self.pool.get_owned().await.map_err(|e| {
95 DataFusionError::Execution(format!("Failed to get oracle connection due to {e:?}"))
96 })?;
97 Ok(Arc::new(OracleConnection { conn }))
98 }
99}
100
101#[derive(Debug)]
102pub struct OracleConnection {
103 conn: bb8::PooledConnection<'static, OracleConnectionManager>,
104}
105
106#[async_trait::async_trait]
107impl Connection for OracleConnection {
108 fn as_any(&self) -> &dyn Any {
109 self
110 }
111
112 async fn infer_schema(&self, source: &RemoteSource) -> DFResult<RemoteSchemaRef> {
113 let sql = RemoteDbType::Oracle.limit_1_query_if_possible(source);
114 let result_set = self.conn.query(&sql, &[]).map_err(|e| {
115 DataFusionError::Execution(format!("Failed to execute query {sql} on oracle: {e:?}"))
116 })?;
117 let remote_schema = Arc::new(build_remote_schema(&result_set)?);
118 Ok(remote_schema)
119 }
120
121 async fn query(
122 &self,
123 conn_options: &ConnectionOptions,
124 source: &RemoteSource,
125 table_schema: SchemaRef,
126 projection: Option<&Vec<usize>>,
127 unparsed_filters: &[String],
128 limit: Option<usize>,
129 ) -> DFResult<SendableRecordBatchStream> {
130 let projected_schema = project_schema(&table_schema, projection)?;
131
132 let sql = RemoteDbType::Oracle.rewrite_query(source, unparsed_filters, limit);
133 debug!("[remote-table] executing oracle query: {sql}");
134
135 let projection = projection.cloned();
136 let chunk_size = conn_options.stream_chunk_size();
137 let result_set = self.conn.query(&sql, &[]).map_err(|e| {
138 DataFusionError::Execution(format!("Failed to execute query on oracle: {e:?}"))
139 })?;
140 let stream = futures::stream::iter(result_set).chunks(chunk_size).boxed();
141
142 let stream = stream.map(move |rows| {
143 let rows: Vec<Row> = rows
144 .into_iter()
145 .collect::<Result<Vec<_>, _>>()
146 .map_err(|e| {
147 DataFusionError::Execution(format!(
148 "Failed to collect rows from oracle due to {e}",
149 ))
150 })?;
151 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
152 });
153
154 Ok(Box::pin(RecordBatchStreamAdapter::new(
155 projected_schema,
156 stream,
157 )))
158 }
159
160 async fn insert(
161 &self,
162 _conn_options: &ConnectionOptions,
163 _literalizer: Arc<dyn Literalize>,
164 _table: &[String],
165 _remote_schema: RemoteSchemaRef,
166 _input: SendableRecordBatchStream,
167 ) -> DFResult<usize> {
168 Err(DataFusionError::Execution(
169 "Insert operation is not supported for oracle".to_string(),
170 ))
171 }
172}
173
174fn oracle_type_to_remote_type(oracle_type: &ColumnType) -> DFResult<OracleType> {
175 match oracle_type {
176 ColumnType::Number(precision, scale) => {
177 let precision = if *precision == 0 { 38 } else { *precision };
179 let scale = if *scale == -127 { 0 } else { *scale };
180 Ok(OracleType::Number(precision, scale))
181 }
182 ColumnType::BinaryFloat => Ok(OracleType::BinaryFloat),
183 ColumnType::BinaryDouble => Ok(OracleType::BinaryDouble),
184 ColumnType::Float(precision) => Ok(OracleType::Float(*precision)),
185 ColumnType::Varchar2(size) => Ok(OracleType::Varchar2(*size)),
186 ColumnType::NVarchar2(size) => Ok(OracleType::NVarchar2(*size)),
187 ColumnType::Char(size) => Ok(OracleType::Char(*size)),
188 ColumnType::NChar(size) => Ok(OracleType::NChar(*size)),
189 ColumnType::Long => Ok(OracleType::Long),
190 ColumnType::CLOB => Ok(OracleType::Clob),
191 ColumnType::NCLOB => Ok(OracleType::NClob),
192 ColumnType::Raw(size) => Ok(OracleType::Raw(*size)),
193 ColumnType::LongRaw => Ok(OracleType::LongRaw),
194 ColumnType::BLOB => Ok(OracleType::Blob),
195 ColumnType::Date => Ok(OracleType::Date),
196 ColumnType::Timestamp(_) => Ok(OracleType::Timestamp),
197 ColumnType::Boolean => Ok(OracleType::Boolean),
198 ColumnType::Object(_) => Ok(OracleType::SdeGeometry),
199 _ => Err(DataFusionError::NotImplemented(format!(
200 "Unsupported oracle type: {oracle_type:?}",
201 ))),
202 }
203}
204
205fn build_remote_schema(result_set: &oracle::ResultSet<Row>) -> DFResult<RemoteSchema> {
206 let mut remote_fields = vec![];
207 for col in result_set.column_info() {
208 let remote_type = RemoteType::Oracle(oracle_type_to_remote_type(col.oracle_type())?);
209 remote_fields.push(RemoteField::new(col.name(), remote_type, col.nullable()));
210 }
211 Ok(RemoteSchema::new(remote_fields))
212}
213
214macro_rules! handle_primitive_type {
215 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
216 let builder = $builder
217 .as_any_mut()
218 .downcast_mut::<$builder_ty>()
219 .unwrap_or_else(|| {
220 panic!(
221 "Failed to downcast builder to {} for {:?} and {:?}",
222 stringify!($builder_ty),
223 $field,
224 $col
225 )
226 });
227 let v = $row.get::<usize, Option<$value_ty>>($index).map_err(|e| {
228 DataFusionError::Execution(format!(
229 "Failed to get {} value for {:?} and {:?}: {e:?}",
230 stringify!($value_ty),
231 $field,
232 $col
233 ))
234 })?;
235
236 match v {
237 Some(v) => builder.append_value($convert(v)?),
238 None => builder.append_null(),
239 }
240 }};
241}
242
243fn rows_to_batch(
244 rows: &[Row],
245 table_schema: &SchemaRef,
246 projection: Option<&Vec<usize>>,
247) -> DFResult<RecordBatch> {
248 let projected_schema = project_schema(table_schema, projection)?;
249 let mut array_builders = vec![];
250 for field in table_schema.fields() {
251 let builder = make_builder(field.data_type(), rows.len());
252 array_builders.push(builder);
253 }
254
255 for row in rows {
256 for (idx, field) in table_schema.fields.iter().enumerate() {
257 if !projections_contains(projection, idx) {
258 continue;
259 }
260 let builder = &mut array_builders[idx];
261 let col = row.column_info().get(idx);
262 match field.data_type() {
263 DataType::Int16 => {
264 handle_primitive_type!(
265 builder,
266 field,
267 col,
268 Int16Builder,
269 i16,
270 row,
271 idx,
272 just_return
273 );
274 }
275 DataType::Int32 => {
276 handle_primitive_type!(
277 builder,
278 field,
279 col,
280 Int32Builder,
281 i32,
282 row,
283 idx,
284 just_return
285 );
286 }
287 DataType::Int64 => {
288 handle_primitive_type!(
289 builder,
290 field,
291 col,
292 Int64Builder,
293 i64,
294 row,
295 idx,
296 just_return
297 );
298 }
299 DataType::Float32 => {
300 handle_primitive_type!(
301 builder,
302 field,
303 col,
304 Float32Builder,
305 f32,
306 row,
307 idx,
308 just_return
309 );
310 }
311 DataType::Float64 => {
312 handle_primitive_type!(
313 builder,
314 field,
315 col,
316 Float64Builder,
317 f64,
318 row,
319 idx,
320 just_return
321 );
322 }
323 DataType::Utf8 => {
324 handle_primitive_type!(
325 builder,
326 field,
327 col,
328 StringBuilder,
329 String,
330 row,
331 idx,
332 just_return
333 );
334 }
335 DataType::LargeUtf8 => {
336 handle_primitive_type!(
337 builder,
338 field,
339 col,
340 LargeStringBuilder,
341 String,
342 row,
343 idx,
344 just_return
345 );
346 }
347 DataType::Decimal128(_precision, scale) => {
348 handle_primitive_type!(
349 builder,
350 field,
351 col,
352 Decimal128Builder,
353 String,
354 row,
355 idx,
356 |v: String| {
357 let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
358 DataFusionError::Execution(format!(
359 "Failed to parse BigDecimal from {v:?}: {e:?}",
360 ))
361 })?;
362 big_decimal_to_i128(&decimal, Some(*scale as i32))
363 }
364 );
365 }
366 DataType::Timestamp(TimeUnit::Second, None) => {
367 handle_primitive_type!(
368 builder,
369 field,
370 col,
371 TimestampSecondBuilder,
372 chrono::NaiveDateTime,
373 row,
374 idx,
375 |v: chrono::NaiveDateTime| {
376 let t = v.and_utc().timestamp();
377 Ok::<_, DataFusionError>(t)
378 }
379 );
380 }
381 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
382 handle_primitive_type!(
383 builder,
384 field,
385 col,
386 TimestampNanosecondBuilder,
387 chrono::NaiveDateTime,
388 row,
389 idx,
390 |v: chrono::NaiveDateTime| {
391 v.and_utc().timestamp_nanos_opt().ok_or_else(|| {
392 DataFusionError::Execution(format!(
393 "Failed to convert chrono::NaiveDateTime {v} to nanos timestamp"
394 ))
395 })
396 }
397 );
398 }
399 DataType::Date64 => {
400 handle_primitive_type!(
401 builder,
402 field,
403 col,
404 Date64Builder,
405 chrono::NaiveDateTime,
406 row,
407 idx,
408 |v: chrono::NaiveDateTime| {
409 Ok::<_, DataFusionError>(v.and_utc().timestamp_millis())
410 }
411 );
412 }
413 DataType::Boolean => {
414 handle_primitive_type!(
415 builder,
416 field,
417 col,
418 BooleanBuilder,
419 bool,
420 row,
421 idx,
422 just_return
423 );
424 }
425 DataType::Binary => {
426 handle_primitive_type!(
427 builder,
428 field,
429 col,
430 BinaryBuilder,
431 Vec<u8>,
432 row,
433 idx,
434 just_return
435 );
436 }
437 DataType::LargeBinary => {
438 handle_primitive_type!(
439 builder,
440 field,
441 col,
442 LargeBinaryBuilder,
443 Vec<u8>,
444 row,
445 idx,
446 just_return
447 );
448 }
449 DataType::Struct(fields) => {
450 let builder = builder
451 .as_any_mut()
452 .downcast_mut::<StructBuilder>()
453 .unwrap_or_else(|| {
454 panic!("Failed to downcast builder to StructBuilder for {field:?} and {col:?}")
455 });
456 let object = row.get::<usize, Option<Object>>(idx).map_err(|e| {
457 DataFusionError::Execution(format!(
458 "Failed to get object for {field:?} and {col:?}: {e:?}"
459 ))
460 })?;
461 append_object_to_struct_builder(builder, fields, object)?;
462 }
463 _ => {
464 return Err(DataFusionError::NotImplemented(format!(
465 "Unsupported data type {:?} for col: {:?}",
466 field.data_type(),
467 col
468 )));
469 }
470 }
471 }
472 }
473
474 let projected_columns = array_builders
475 .into_iter()
476 .enumerate()
477 .filter(|(idx, _)| projections_contains(projection, *idx))
478 .map(|(_, mut builder)| builder.finish())
479 .collect::<Vec<ArrayRef>>();
480 let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
481 Ok(RecordBatch::try_new_with_options(
482 projected_schema,
483 projected_columns,
484 &options,
485 )?)
486}
487
488macro_rules! append_object_attr {
489 ($struct_builder:expr, $field_builder_type:ty, $field:expr, $field_idx:expr, $object_opt:expr, $field_value_ty:ty, $convert:expr) => {{
490 let field_builder = $struct_builder
491 .field_builder::<$field_builder_type>($field_idx)
492 .ok_or_else(|| {
493 DataFusionError::Execution(format!(
494 "Failed to get {} field builder for {:?}",
495 stringify!($field_builder_type),
496 $field,
497 ))
498 })?;
499 match &$object_opt {
500 Some(object) => {
501 let field_name = $field.name();
502 let field_value_opt =
503 object
504 .get::<Option<$field_value_ty>>(field_name)
505 .map_err(|e| {
506 DataFusionError::Execution(format!(
507 "Failed to get {} field value for {:?}: {e:?}",
508 stringify!($field_value_ty),
509 $field,
510 ))
511 })?;
512 match field_value_opt {
513 Some(field_value) => {
514 field_builder.append_value($convert(field_value)?);
515 }
516 None => {
517 field_builder.append_null();
518 }
519 }
520 }
521 None => {
522 field_builder.append_null();
523 }
524 }
525 }};
526}
527
528fn append_object_to_struct_builder(
529 builder: &mut StructBuilder,
530 fields: &Fields,
531 object_opt: Option<Object>,
532) -> DFResult<()> {
533 for (idx, field) in fields.iter().enumerate() {
534 match field.data_type() {
535 DataType::Int32 => {
536 append_object_attr!(
537 builder,
538 Int32Builder,
539 field,
540 idx,
541 object_opt,
542 i32,
543 just_return
544 );
545 }
546 DataType::Int64 => {
547 append_object_attr!(
548 builder,
549 Int64Builder,
550 field,
551 idx,
552 object_opt,
553 i64,
554 just_return
555 );
556 }
557 DataType::Float32 => {
558 append_object_attr!(
559 builder,
560 Float32Builder,
561 field,
562 idx,
563 object_opt,
564 f32,
565 just_return
566 );
567 }
568 DataType::Float64 => {
569 append_object_attr!(
570 builder,
571 Float64Builder,
572 field,
573 idx,
574 object_opt,
575 f64,
576 just_return
577 );
578 }
579 DataType::Decimal128(_precision, scale) => {
580 append_object_attr!(
581 builder,
582 Decimal128Builder,
583 field,
584 idx,
585 object_opt,
586 String,
587 |v: String| {
588 let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
589 DataFusionError::Execution(format!(
590 "Failed to parse BigDecimal from {v:?}: {e:?}",
591 ))
592 })?;
593 big_decimal_to_i128(&decimal, Some(*scale as i32))
594 }
595 );
596 }
597 DataType::Binary => {
598 append_object_attr!(
599 builder,
600 BinaryBuilder,
601 field,
602 idx,
603 object_opt,
604 Vec<u8>,
605 just_return
606 );
607 }
608 DataType::LargeBinary => {
609 append_object_attr!(
610 builder,
611 LargeBinaryBuilder,
612 field,
613 idx,
614 object_opt,
615 Vec<u8>,
616 just_return
617 );
618 }
619 DataType::Struct(fields) => {
620 let field_builder =
621 builder.field_builder::<StructBuilder>(idx).ok_or_else(|| {
622 DataFusionError::Execution(format!(
623 "Failed to get struct field builder for {field:?}"
624 ))
625 })?;
626 match &object_opt {
627 Some(object) => {
628 let field_value =
629 object.get::<Option<Object>>(field.name()).map_err(|e| {
630 DataFusionError::Execution(format!(
631 "Failed to get object value for {field:?}: {e:?}"
632 ))
633 })?;
634 append_object_to_struct_builder(field_builder, fields, field_value)?;
635 }
636 None => {
637 field_builder.append_null();
638 }
639 }
640 }
641 _ => {
642 return Err(DataFusionError::NotImplemented(format!(
643 "Unsupported struct field type {}",
644 field.data_type(),
645 )));
646 }
647 }
648 }
649
650 match &object_opt {
651 Some(_) => {
652 builder.append(true);
653 }
654 None => {
655 builder.append_null();
656 }
657 }
658 Ok(())
659}