1use crate::connection::{RemoteDbType, just_return, projections_contains};
2use crate::utils::big_decimal_to_i128;
3use crate::{
4 Connection, ConnectionOptions, DFResult, Literalize, OracleConnectionOptions, OracleType, Pool,
5 PoolState, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteSource, RemoteType,
6};
7use arrow::array::{
8 ArrayRef, BinaryBuilder, BinaryViewBuilder, BooleanBuilder, Date64Builder, Decimal128Builder,
9 Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
10 LargeStringBuilder, RecordBatch, RecordBatchOptions, StringBuilder, StringViewBuilder,
11 StructBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, make_builder,
12};
13use arrow::datatypes::{DataType, Fields, SchemaRef, TimeUnit};
14use bb8_oracle::OracleConnectionManager;
15use datafusion_common::{DataFusionError, project_schema};
16use datafusion_execution::SendableRecordBatchStream;
17use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
18use futures::StreamExt;
19use log::debug;
20use oracle::sql_type::{Object, OracleType as ColumnType};
21use oracle::{Connector, Row};
22use std::any::Any;
23use std::sync::Arc;
24
25#[derive(Debug)]
26pub struct OraclePool {
27 pool: bb8::Pool<OracleConnectionManager>,
28}
29
30pub(crate) async fn connect_oracle(options: &OracleConnectionOptions) -> DFResult<OraclePool> {
31 let connect_string = format!(
32 "//{}:{}/{}",
33 options.host, options.port, options.service_name
34 );
35 let connector = Connector::new(
36 options.username.clone(),
37 options.password.clone(),
38 connect_string,
39 );
40 let _ = connector
41 .connect()
42 .map_err(|e| DataFusionError::Internal(format!("Failed to connect to oracle: {e:?}")))?;
43 let manager = OracleConnectionManager::from_connector(connector);
44 let pool = bb8::Pool::builder()
45 .max_size(options.pool_max_size as u32)
46 .min_idle(Some(options.pool_min_idle as u32))
47 .idle_timeout(Some(options.pool_idle_timeout))
48 .reaper_rate(options.pool_ttl_check_interval)
49 .build(manager)
50 .await
51 .map_err(|e| DataFusionError::Internal(format!("Failed to create oracle pool: {e:?}")))?;
52 Ok(OraclePool { pool })
53}
54
55#[async_trait::async_trait]
56impl Pool for OraclePool {
57 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
58 let conn = self.pool.get_owned().await.map_err(|e| {
59 DataFusionError::Execution(format!("Failed to get oracle connection due to {e:?}"))
60 })?;
61 Ok(Arc::new(OracleConnection { conn }))
62 }
63
64 async fn state(&self) -> DFResult<PoolState> {
65 let bb8_state = self.pool.state();
66 Ok(PoolState {
67 connections: bb8_state.connections as usize,
68 idle_connections: bb8_state.idle_connections as usize,
69 })
70 }
71}
72
73#[derive(Debug)]
74pub struct OracleConnection {
75 conn: bb8::PooledConnection<'static, OracleConnectionManager>,
76}
77
78#[async_trait::async_trait]
79impl Connection for OracleConnection {
80 fn as_any(&self) -> &dyn Any {
81 self
82 }
83
84 async fn infer_schema(&self, source: &RemoteSource) -> DFResult<RemoteSchemaRef> {
85 let sql = RemoteDbType::Oracle.limit_1_query_if_possible(source);
86 let result_set = self.conn.query(&sql, &[]).map_err(|e| {
87 DataFusionError::Plan(format!("Failed to execute query {sql} on oracle: {e:?}"))
88 })?;
89 let remote_schema = Arc::new(build_remote_schema(&result_set)?);
90 Ok(remote_schema)
91 }
92
93 async fn query(
94 &self,
95 conn_options: &ConnectionOptions,
96 source: &RemoteSource,
97 table_schema: SchemaRef,
98 projection: Option<&Vec<usize>>,
99 unparsed_filters: &[String],
100 limit: Option<usize>,
101 ) -> DFResult<SendableRecordBatchStream> {
102 let projected_schema = project_schema(&table_schema, projection)?;
103
104 let sql = RemoteDbType::Oracle.rewrite_query(source, unparsed_filters, limit);
105 debug!("[remote-table] executing oracle query: {sql}");
106
107 let projection = projection.cloned();
108 let chunk_size = conn_options.stream_chunk_size();
109 let result_set = self.conn.query(&sql, &[]).map_err(|e| {
110 DataFusionError::Execution(format!("Failed to execute query on oracle: {e:?}"))
111 })?;
112 let stream = futures::stream::iter(result_set).chunks(chunk_size).boxed();
113
114 let stream = stream.map(move |rows| {
115 let rows: Vec<Row> = rows
116 .into_iter()
117 .collect::<Result<Vec<_>, _>>()
118 .map_err(|e| {
119 DataFusionError::Execution(format!(
120 "Failed to collect rows from oracle due to {e}",
121 ))
122 })?;
123 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
124 });
125
126 Ok(Box::pin(RecordBatchStreamAdapter::new(
127 projected_schema,
128 stream,
129 )))
130 }
131
132 async fn insert(
133 &self,
134 _conn_options: &ConnectionOptions,
135 _literalizer: Arc<dyn Literalize>,
136 _table: &[String],
137 _remote_schema: RemoteSchemaRef,
138 _batch: RecordBatch,
139 ) -> DFResult<usize> {
140 Err(DataFusionError::Execution(
141 "Insert operation is not supported for oracle".to_string(),
142 ))
143 }
144}
145
146fn oracle_type_to_remote_type(oracle_type: &ColumnType) -> DFResult<OracleType> {
147 match oracle_type {
148 ColumnType::Number(precision, scale) => {
149 let precision = if *precision == 0 { 38 } else { *precision };
151 let scale = if *scale == -127 { 0 } else { *scale };
152 Ok(OracleType::Number(precision, scale))
153 }
154 ColumnType::BinaryFloat => Ok(OracleType::BinaryFloat),
155 ColumnType::BinaryDouble => Ok(OracleType::BinaryDouble),
156 ColumnType::Float(precision) => Ok(OracleType::Float(*precision)),
157 ColumnType::Varchar2(size) => Ok(OracleType::Varchar2(*size)),
158 ColumnType::NVarchar2(size) => Ok(OracleType::NVarchar2(*size)),
159 ColumnType::Char(size) => Ok(OracleType::Char(*size)),
160 ColumnType::NChar(size) => Ok(OracleType::NChar(*size)),
161 ColumnType::Long => Ok(OracleType::Long),
162 ColumnType::CLOB => Ok(OracleType::Clob),
163 ColumnType::NCLOB => Ok(OracleType::NClob),
164 ColumnType::Raw(size) => Ok(OracleType::Raw(*size)),
165 ColumnType::LongRaw => Ok(OracleType::LongRaw),
166 ColumnType::BLOB => Ok(OracleType::Blob),
167 ColumnType::Date => Ok(OracleType::Date),
168 ColumnType::Timestamp(_) => Ok(OracleType::Timestamp),
169 ColumnType::Boolean => Ok(OracleType::Boolean),
170 ColumnType::Object(_) => Ok(OracleType::SdeGeometry),
171 _ => Err(DataFusionError::NotImplemented(format!(
172 "Unsupported oracle type: {oracle_type:?}",
173 ))),
174 }
175}
176
177fn build_remote_schema(result_set: &oracle::ResultSet<Row>) -> DFResult<RemoteSchema> {
178 let mut remote_fields = vec![];
179 for col in result_set.column_info() {
180 let remote_type = RemoteType::Oracle(oracle_type_to_remote_type(col.oracle_type())?);
181 remote_fields.push(RemoteField::new(col.name(), remote_type, col.nullable()));
182 }
183 Ok(RemoteSchema::new(remote_fields))
184}
185
186macro_rules! handle_primitive_type {
187 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
188 let builder = $builder
189 .as_any_mut()
190 .downcast_mut::<$builder_ty>()
191 .unwrap_or_else(|| {
192 panic!(
193 "Failed to downcast builder to {} for {:?} and {:?}",
194 stringify!($builder_ty),
195 $field,
196 $col
197 )
198 });
199 let v = $row.get::<usize, Option<$value_ty>>($index).map_err(|e| {
200 DataFusionError::Execution(format!(
201 "Failed to get {} value for {:?} and {:?}: {e:?}",
202 stringify!($value_ty),
203 $field,
204 $col
205 ))
206 })?;
207
208 match v {
209 Some(v) => builder.append_value($convert(v)?),
210 None => builder.append_null(),
211 }
212 }};
213}
214
215fn rows_to_batch(
216 rows: &[Row],
217 table_schema: &SchemaRef,
218 projection: Option<&Vec<usize>>,
219) -> DFResult<RecordBatch> {
220 let projected_schema = project_schema(table_schema, projection)?;
221 let mut array_builders = vec![];
222 for field in table_schema.fields() {
223 let builder = make_builder(field.data_type(), rows.len());
224 array_builders.push(builder);
225 }
226
227 for row in rows {
228 for (idx, field) in table_schema.fields.iter().enumerate() {
229 if !projections_contains(projection, idx) {
230 continue;
231 }
232 let builder = &mut array_builders[idx];
233 let col = row.column_info().get(idx);
234 match field.data_type() {
235 DataType::Int16 => {
236 handle_primitive_type!(
237 builder,
238 field,
239 col,
240 Int16Builder,
241 i16,
242 row,
243 idx,
244 just_return
245 );
246 }
247 DataType::Int32 => {
248 handle_primitive_type!(
249 builder,
250 field,
251 col,
252 Int32Builder,
253 i32,
254 row,
255 idx,
256 just_return
257 );
258 }
259 DataType::Int64 => {
260 handle_primitive_type!(
261 builder,
262 field,
263 col,
264 Int64Builder,
265 i64,
266 row,
267 idx,
268 just_return
269 );
270 }
271 DataType::Float32 => {
272 handle_primitive_type!(
273 builder,
274 field,
275 col,
276 Float32Builder,
277 f32,
278 row,
279 idx,
280 just_return
281 );
282 }
283 DataType::Float64 => {
284 handle_primitive_type!(
285 builder,
286 field,
287 col,
288 Float64Builder,
289 f64,
290 row,
291 idx,
292 just_return
293 );
294 }
295 DataType::Utf8 => {
296 handle_primitive_type!(
297 builder,
298 field,
299 col,
300 StringBuilder,
301 String,
302 row,
303 idx,
304 just_return
305 );
306 }
307 DataType::LargeUtf8 => {
308 handle_primitive_type!(
309 builder,
310 field,
311 col,
312 LargeStringBuilder,
313 String,
314 row,
315 idx,
316 just_return
317 );
318 }
319 DataType::Utf8View => {
320 handle_primitive_type!(
321 builder,
322 field,
323 col,
324 StringViewBuilder,
325 String,
326 row,
327 idx,
328 just_return
329 );
330 }
331 DataType::Decimal128(_precision, scale) => {
332 handle_primitive_type!(
333 builder,
334 field,
335 col,
336 Decimal128Builder,
337 String,
338 row,
339 idx,
340 |v: String| {
341 let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
342 DataFusionError::Execution(format!(
343 "Failed to parse BigDecimal from {v:?}: {e:?}",
344 ))
345 })?;
346 big_decimal_to_i128(&decimal, Some(*scale as i32))
347 }
348 );
349 }
350 DataType::Timestamp(TimeUnit::Second, None) => {
351 handle_primitive_type!(
352 builder,
353 field,
354 col,
355 TimestampSecondBuilder,
356 chrono::NaiveDateTime,
357 row,
358 idx,
359 |v: chrono::NaiveDateTime| {
360 let t = v.and_utc().timestamp();
361 Ok::<_, DataFusionError>(t)
362 }
363 );
364 }
365 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
366 handle_primitive_type!(
367 builder,
368 field,
369 col,
370 TimestampNanosecondBuilder,
371 chrono::NaiveDateTime,
372 row,
373 idx,
374 |v: chrono::NaiveDateTime| {
375 v.and_utc().timestamp_nanos_opt().ok_or_else(|| {
376 DataFusionError::Execution(format!(
377 "Failed to convert chrono::NaiveDateTime {v} to nanos timestamp"
378 ))
379 })
380 }
381 );
382 }
383 DataType::Date64 => {
384 handle_primitive_type!(
385 builder,
386 field,
387 col,
388 Date64Builder,
389 chrono::NaiveDateTime,
390 row,
391 idx,
392 |v: chrono::NaiveDateTime| {
393 Ok::<_, DataFusionError>(v.and_utc().timestamp_millis())
394 }
395 );
396 }
397 DataType::Boolean => {
398 handle_primitive_type!(
399 builder,
400 field,
401 col,
402 BooleanBuilder,
403 bool,
404 row,
405 idx,
406 just_return
407 );
408 }
409 DataType::Binary => {
410 handle_primitive_type!(
411 builder,
412 field,
413 col,
414 BinaryBuilder,
415 Vec<u8>,
416 row,
417 idx,
418 just_return
419 );
420 }
421 DataType::LargeBinary => {
422 handle_primitive_type!(
423 builder,
424 field,
425 col,
426 LargeBinaryBuilder,
427 Vec<u8>,
428 row,
429 idx,
430 just_return
431 );
432 }
433 DataType::BinaryView => {
434 handle_primitive_type!(
435 builder,
436 field,
437 col,
438 BinaryViewBuilder,
439 Vec<u8>,
440 row,
441 idx,
442 just_return
443 );
444 }
445 DataType::Struct(fields) => {
446 let builder = builder
447 .as_any_mut()
448 .downcast_mut::<StructBuilder>()
449 .unwrap_or_else(|| {
450 panic!("Failed to downcast builder to StructBuilder for {field:?} and {col:?}")
451 });
452 let object = row.get::<usize, Option<Object>>(idx).map_err(|e| {
453 DataFusionError::Execution(format!(
454 "Failed to get object for {field:?} and {col:?}: {e:?}"
455 ))
456 })?;
457 append_object_to_struct_builder(builder, fields, object)?;
458 }
459 _ => {
460 return Err(DataFusionError::NotImplemented(format!(
461 "Unsupported data type {:?} for col: {:?}",
462 field.data_type(),
463 col
464 )));
465 }
466 }
467 }
468 }
469
470 let projected_columns = array_builders
471 .into_iter()
472 .enumerate()
473 .filter(|(idx, _)| projections_contains(projection, *idx))
474 .map(|(_, mut builder)| builder.finish())
475 .collect::<Vec<ArrayRef>>();
476 let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
477 Ok(RecordBatch::try_new_with_options(
478 projected_schema,
479 projected_columns,
480 &options,
481 )?)
482}
483
484macro_rules! append_object_attr {
485 ($struct_builder:expr, $field_builder_type:ty, $field:expr, $field_idx:expr, $object_opt:expr, $field_value_ty:ty, $convert:expr) => {{
486 let field_builder = $struct_builder
487 .field_builder::<$field_builder_type>($field_idx)
488 .ok_or_else(|| {
489 DataFusionError::Execution(format!(
490 "Failed to get {} field builder for {:?}",
491 stringify!($field_builder_type),
492 $field,
493 ))
494 })?;
495 match &$object_opt {
496 Some(object) => {
497 let field_name = $field.name();
498 let field_value_opt =
499 object
500 .get::<Option<$field_value_ty>>(field_name)
501 .map_err(|e| {
502 DataFusionError::Execution(format!(
503 "Failed to get {} field value for {:?}: {e:?}",
504 stringify!($field_value_ty),
505 $field,
506 ))
507 })?;
508 match field_value_opt {
509 Some(field_value) => {
510 field_builder.append_value($convert(field_value)?);
511 }
512 None => {
513 field_builder.append_null();
514 }
515 }
516 }
517 None => {
518 field_builder.append_null();
519 }
520 }
521 }};
522}
523
524fn append_object_to_struct_builder(
525 builder: &mut StructBuilder,
526 fields: &Fields,
527 object_opt: Option<Object>,
528) -> DFResult<()> {
529 for (idx, field) in fields.iter().enumerate() {
530 match field.data_type() {
531 DataType::Int32 => {
532 append_object_attr!(
533 builder,
534 Int32Builder,
535 field,
536 idx,
537 object_opt,
538 i32,
539 just_return
540 );
541 }
542 DataType::Int64 => {
543 append_object_attr!(
544 builder,
545 Int64Builder,
546 field,
547 idx,
548 object_opt,
549 i64,
550 just_return
551 );
552 }
553 DataType::Float32 => {
554 append_object_attr!(
555 builder,
556 Float32Builder,
557 field,
558 idx,
559 object_opt,
560 f32,
561 just_return
562 );
563 }
564 DataType::Float64 => {
565 append_object_attr!(
566 builder,
567 Float64Builder,
568 field,
569 idx,
570 object_opt,
571 f64,
572 just_return
573 );
574 }
575 DataType::Decimal128(_precision, scale) => {
576 append_object_attr!(
577 builder,
578 Decimal128Builder,
579 field,
580 idx,
581 object_opt,
582 String,
583 |v: String| {
584 let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
585 DataFusionError::Execution(format!(
586 "Failed to parse BigDecimal from {v:?}: {e:?}",
587 ))
588 })?;
589 big_decimal_to_i128(&decimal, Some(*scale as i32))
590 }
591 );
592 }
593 DataType::Binary => {
594 append_object_attr!(
595 builder,
596 BinaryBuilder,
597 field,
598 idx,
599 object_opt,
600 Vec<u8>,
601 just_return
602 );
603 }
604 DataType::LargeBinary => {
605 append_object_attr!(
606 builder,
607 LargeBinaryBuilder,
608 field,
609 idx,
610 object_opt,
611 Vec<u8>,
612 just_return
613 );
614 }
615 DataType::Struct(fields) => {
616 let field_builder =
617 builder.field_builder::<StructBuilder>(idx).ok_or_else(|| {
618 DataFusionError::Execution(format!(
619 "Failed to get struct field builder for {field:?}"
620 ))
621 })?;
622 match &object_opt {
623 Some(object) => {
624 let field_value =
625 object.get::<Option<Object>>(field.name()).map_err(|e| {
626 DataFusionError::Execution(format!(
627 "Failed to get object value for {field:?}: {e:?}"
628 ))
629 })?;
630 append_object_to_struct_builder(field_builder, fields, field_value)?;
631 }
632 None => {
633 field_builder.append_null();
634 }
635 }
636 }
637 _ => {
638 return Err(DataFusionError::NotImplemented(format!(
639 "Unsupported struct field type {}",
640 field.data_type(),
641 )));
642 }
643 }
644 }
645
646 match &object_opt {
647 Some(_) => {
648 builder.append(true);
649 }
650 None => {
651 builder.append_null();
652 }
653 }
654 Ok(())
655}