1use crate::connection::{RemoteDbType, just_return, projections_contains};
2use crate::utils::big_decimal_to_i128;
3use crate::{
4 Connection, ConnectionOptions, DFResult, Literalize, OracleConnectionOptions, OracleType, Pool,
5 PoolState, RemoteField, RemoteSchema, RemoteSchemaRef, RemoteSource, RemoteType,
6};
7use bb8_oracle::OracleConnectionManager;
8use datafusion::arrow::array::{
9 ArrayRef, BinaryBuilder, BooleanBuilder, Date64Builder, Decimal128Builder, Float32Builder,
10 Float64Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
11 LargeStringBuilder, RecordBatch, RecordBatchOptions, StringBuilder, StructBuilder,
12 TimestampNanosecondBuilder, TimestampSecondBuilder, make_builder,
13};
14use datafusion::arrow::datatypes::{DataType, Fields, SchemaRef, TimeUnit};
15use datafusion::common::{DataFusionError, project_schema};
16use datafusion::execution::SendableRecordBatchStream;
17use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
18use futures::StreamExt;
19use log::debug;
20use oracle::sql_type::{Object, OracleType as ColumnType};
21use oracle::{Connector, Row};
22use std::any::Any;
23use std::sync::Arc;
24
25#[derive(Debug)]
26pub struct OraclePool {
27 pool: bb8::Pool<OracleConnectionManager>,
28}
29
30pub(crate) async fn connect_oracle(options: &OracleConnectionOptions) -> DFResult<OraclePool> {
31 let connect_string = format!(
32 "//{}:{}/{}",
33 options.host, options.port, options.service_name
34 );
35 let connector = Connector::new(
36 options.username.clone(),
37 options.password.clone(),
38 connect_string,
39 );
40 let _ = connector
41 .connect()
42 .map_err(|e| DataFusionError::Internal(format!("Failed to connect to oracle: {e:?}")))?;
43 let manager = OracleConnectionManager::from_connector(connector);
44 let pool = bb8::Pool::builder()
45 .max_size(options.pool_max_size as u32)
46 .min_idle(Some(options.pool_min_idle as u32))
47 .idle_timeout(Some(options.pool_idle_timeout))
48 .reaper_rate(options.pool_ttl_check_interval)
49 .build(manager)
50 .await
51 .map_err(|e| DataFusionError::Internal(format!("Failed to create oracle pool: {e:?}")))?;
52 Ok(OraclePool { pool })
53}
54
55#[async_trait::async_trait]
56impl Pool for OraclePool {
57 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
58 let conn = self.pool.get_owned().await.map_err(|e| {
59 DataFusionError::Execution(format!("Failed to get oracle connection due to {e:?}"))
60 })?;
61 Ok(Arc::new(OracleConnection { conn }))
62 }
63
64 async fn state(&self) -> DFResult<PoolState> {
65 let bb8_state = self.pool.state();
66 Ok(PoolState {
67 connections: bb8_state.connections as usize,
68 idle_connections: bb8_state.idle_connections as usize,
69 })
70 }
71}
72
73#[derive(Debug)]
74pub struct OracleConnection {
75 conn: bb8::PooledConnection<'static, OracleConnectionManager>,
76}
77
78#[async_trait::async_trait]
79impl Connection for OracleConnection {
80 fn as_any(&self) -> &dyn Any {
81 self
82 }
83
84 async fn infer_schema(&self, source: &RemoteSource) -> DFResult<RemoteSchemaRef> {
85 let sql = RemoteDbType::Oracle.limit_1_query_if_possible(source);
86 let result_set = self.conn.query(&sql, &[]).map_err(|e| {
87 DataFusionError::Plan(format!("Failed to execute query {sql} on oracle: {e:?}"))
88 })?;
89 let remote_schema = Arc::new(build_remote_schema(&result_set)?);
90 Ok(remote_schema)
91 }
92
93 async fn query(
94 &self,
95 conn_options: &ConnectionOptions,
96 source: &RemoteSource,
97 table_schema: SchemaRef,
98 projection: Option<&Vec<usize>>,
99 unparsed_filters: &[String],
100 limit: Option<usize>,
101 ) -> DFResult<SendableRecordBatchStream> {
102 let projected_schema = project_schema(&table_schema, projection)?;
103
104 let sql = RemoteDbType::Oracle.rewrite_query(source, unparsed_filters, limit);
105 debug!("[remote-table] executing oracle query: {sql}");
106
107 let projection = projection.cloned();
108 let chunk_size = conn_options.stream_chunk_size();
109 let result_set = self.conn.query(&sql, &[]).map_err(|e| {
110 DataFusionError::Execution(format!("Failed to execute query on oracle: {e:?}"))
111 })?;
112 let stream = futures::stream::iter(result_set).chunks(chunk_size).boxed();
113
114 let stream = stream.map(move |rows| {
115 let rows: Vec<Row> = rows
116 .into_iter()
117 .collect::<Result<Vec<_>, _>>()
118 .map_err(|e| {
119 DataFusionError::Execution(format!(
120 "Failed to collect rows from oracle due to {e}",
121 ))
122 })?;
123 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
124 });
125
126 Ok(Box::pin(RecordBatchStreamAdapter::new(
127 projected_schema,
128 stream,
129 )))
130 }
131
132 async fn insert(
133 &self,
134 _conn_options: &ConnectionOptions,
135 _literalizer: Arc<dyn Literalize>,
136 _table: &[String],
137 _remote_schema: RemoteSchemaRef,
138 _input: SendableRecordBatchStream,
139 ) -> DFResult<usize> {
140 Err(DataFusionError::Execution(
141 "Insert operation is not supported for oracle".to_string(),
142 ))
143 }
144}
145
146fn oracle_type_to_remote_type(oracle_type: &ColumnType) -> DFResult<OracleType> {
147 match oracle_type {
148 ColumnType::Number(precision, scale) => {
149 let precision = if *precision == 0 { 38 } else { *precision };
151 let scale = if *scale == -127 { 0 } else { *scale };
152 Ok(OracleType::Number(precision, scale))
153 }
154 ColumnType::BinaryFloat => Ok(OracleType::BinaryFloat),
155 ColumnType::BinaryDouble => Ok(OracleType::BinaryDouble),
156 ColumnType::Float(precision) => Ok(OracleType::Float(*precision)),
157 ColumnType::Varchar2(size) => Ok(OracleType::Varchar2(*size)),
158 ColumnType::NVarchar2(size) => Ok(OracleType::NVarchar2(*size)),
159 ColumnType::Char(size) => Ok(OracleType::Char(*size)),
160 ColumnType::NChar(size) => Ok(OracleType::NChar(*size)),
161 ColumnType::Long => Ok(OracleType::Long),
162 ColumnType::CLOB => Ok(OracleType::Clob),
163 ColumnType::NCLOB => Ok(OracleType::NClob),
164 ColumnType::Raw(size) => Ok(OracleType::Raw(*size)),
165 ColumnType::LongRaw => Ok(OracleType::LongRaw),
166 ColumnType::BLOB => Ok(OracleType::Blob),
167 ColumnType::Date => Ok(OracleType::Date),
168 ColumnType::Timestamp(_) => Ok(OracleType::Timestamp),
169 ColumnType::Boolean => Ok(OracleType::Boolean),
170 ColumnType::Object(_) => Ok(OracleType::SdeGeometry),
171 _ => Err(DataFusionError::NotImplemented(format!(
172 "Unsupported oracle type: {oracle_type:?}",
173 ))),
174 }
175}
176
177fn build_remote_schema(result_set: &oracle::ResultSet<Row>) -> DFResult<RemoteSchema> {
178 let mut remote_fields = vec![];
179 for col in result_set.column_info() {
180 let remote_type = RemoteType::Oracle(oracle_type_to_remote_type(col.oracle_type())?);
181 remote_fields.push(RemoteField::new(col.name(), remote_type, col.nullable()));
182 }
183 Ok(RemoteSchema::new(remote_fields))
184}
185
186macro_rules! handle_primitive_type {
187 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
188 let builder = $builder
189 .as_any_mut()
190 .downcast_mut::<$builder_ty>()
191 .unwrap_or_else(|| {
192 panic!(
193 "Failed to downcast builder to {} for {:?} and {:?}",
194 stringify!($builder_ty),
195 $field,
196 $col
197 )
198 });
199 let v = $row.get::<usize, Option<$value_ty>>($index).map_err(|e| {
200 DataFusionError::Execution(format!(
201 "Failed to get {} value for {:?} and {:?}: {e:?}",
202 stringify!($value_ty),
203 $field,
204 $col
205 ))
206 })?;
207
208 match v {
209 Some(v) => builder.append_value($convert(v)?),
210 None => builder.append_null(),
211 }
212 }};
213}
214
215fn rows_to_batch(
216 rows: &[Row],
217 table_schema: &SchemaRef,
218 projection: Option<&Vec<usize>>,
219) -> DFResult<RecordBatch> {
220 let projected_schema = project_schema(table_schema, projection)?;
221 let mut array_builders = vec![];
222 for field in table_schema.fields() {
223 let builder = make_builder(field.data_type(), rows.len());
224 array_builders.push(builder);
225 }
226
227 for row in rows {
228 for (idx, field) in table_schema.fields.iter().enumerate() {
229 if !projections_contains(projection, idx) {
230 continue;
231 }
232 let builder = &mut array_builders[idx];
233 let col = row.column_info().get(idx);
234 match field.data_type() {
235 DataType::Int16 => {
236 handle_primitive_type!(
237 builder,
238 field,
239 col,
240 Int16Builder,
241 i16,
242 row,
243 idx,
244 just_return
245 );
246 }
247 DataType::Int32 => {
248 handle_primitive_type!(
249 builder,
250 field,
251 col,
252 Int32Builder,
253 i32,
254 row,
255 idx,
256 just_return
257 );
258 }
259 DataType::Int64 => {
260 handle_primitive_type!(
261 builder,
262 field,
263 col,
264 Int64Builder,
265 i64,
266 row,
267 idx,
268 just_return
269 );
270 }
271 DataType::Float32 => {
272 handle_primitive_type!(
273 builder,
274 field,
275 col,
276 Float32Builder,
277 f32,
278 row,
279 idx,
280 just_return
281 );
282 }
283 DataType::Float64 => {
284 handle_primitive_type!(
285 builder,
286 field,
287 col,
288 Float64Builder,
289 f64,
290 row,
291 idx,
292 just_return
293 );
294 }
295 DataType::Utf8 => {
296 handle_primitive_type!(
297 builder,
298 field,
299 col,
300 StringBuilder,
301 String,
302 row,
303 idx,
304 just_return
305 );
306 }
307 DataType::LargeUtf8 => {
308 handle_primitive_type!(
309 builder,
310 field,
311 col,
312 LargeStringBuilder,
313 String,
314 row,
315 idx,
316 just_return
317 );
318 }
319 DataType::Decimal128(_precision, scale) => {
320 handle_primitive_type!(
321 builder,
322 field,
323 col,
324 Decimal128Builder,
325 String,
326 row,
327 idx,
328 |v: String| {
329 let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
330 DataFusionError::Execution(format!(
331 "Failed to parse BigDecimal from {v:?}: {e:?}",
332 ))
333 })?;
334 big_decimal_to_i128(&decimal, Some(*scale as i32))
335 }
336 );
337 }
338 DataType::Timestamp(TimeUnit::Second, None) => {
339 handle_primitive_type!(
340 builder,
341 field,
342 col,
343 TimestampSecondBuilder,
344 chrono::NaiveDateTime,
345 row,
346 idx,
347 |v: chrono::NaiveDateTime| {
348 let t = v.and_utc().timestamp();
349 Ok::<_, DataFusionError>(t)
350 }
351 );
352 }
353 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
354 handle_primitive_type!(
355 builder,
356 field,
357 col,
358 TimestampNanosecondBuilder,
359 chrono::NaiveDateTime,
360 row,
361 idx,
362 |v: chrono::NaiveDateTime| {
363 v.and_utc().timestamp_nanos_opt().ok_or_else(|| {
364 DataFusionError::Execution(format!(
365 "Failed to convert chrono::NaiveDateTime {v} to nanos timestamp"
366 ))
367 })
368 }
369 );
370 }
371 DataType::Date64 => {
372 handle_primitive_type!(
373 builder,
374 field,
375 col,
376 Date64Builder,
377 chrono::NaiveDateTime,
378 row,
379 idx,
380 |v: chrono::NaiveDateTime| {
381 Ok::<_, DataFusionError>(v.and_utc().timestamp_millis())
382 }
383 );
384 }
385 DataType::Boolean => {
386 handle_primitive_type!(
387 builder,
388 field,
389 col,
390 BooleanBuilder,
391 bool,
392 row,
393 idx,
394 just_return
395 );
396 }
397 DataType::Binary => {
398 handle_primitive_type!(
399 builder,
400 field,
401 col,
402 BinaryBuilder,
403 Vec<u8>,
404 row,
405 idx,
406 just_return
407 );
408 }
409 DataType::LargeBinary => {
410 handle_primitive_type!(
411 builder,
412 field,
413 col,
414 LargeBinaryBuilder,
415 Vec<u8>,
416 row,
417 idx,
418 just_return
419 );
420 }
421 DataType::Struct(fields) => {
422 let builder = builder
423 .as_any_mut()
424 .downcast_mut::<StructBuilder>()
425 .unwrap_or_else(|| {
426 panic!("Failed to downcast builder to StructBuilder for {field:?} and {col:?}")
427 });
428 let object = row.get::<usize, Option<Object>>(idx).map_err(|e| {
429 DataFusionError::Execution(format!(
430 "Failed to get object for {field:?} and {col:?}: {e:?}"
431 ))
432 })?;
433 append_object_to_struct_builder(builder, fields, object)?;
434 }
435 _ => {
436 return Err(DataFusionError::NotImplemented(format!(
437 "Unsupported data type {:?} for col: {:?}",
438 field.data_type(),
439 col
440 )));
441 }
442 }
443 }
444 }
445
446 let projected_columns = array_builders
447 .into_iter()
448 .enumerate()
449 .filter(|(idx, _)| projections_contains(projection, *idx))
450 .map(|(_, mut builder)| builder.finish())
451 .collect::<Vec<ArrayRef>>();
452 let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
453 Ok(RecordBatch::try_new_with_options(
454 projected_schema,
455 projected_columns,
456 &options,
457 )?)
458}
459
460macro_rules! append_object_attr {
461 ($struct_builder:expr, $field_builder_type:ty, $field:expr, $field_idx:expr, $object_opt:expr, $field_value_ty:ty, $convert:expr) => {{
462 let field_builder = $struct_builder
463 .field_builder::<$field_builder_type>($field_idx)
464 .ok_or_else(|| {
465 DataFusionError::Execution(format!(
466 "Failed to get {} field builder for {:?}",
467 stringify!($field_builder_type),
468 $field,
469 ))
470 })?;
471 match &$object_opt {
472 Some(object) => {
473 let field_name = $field.name();
474 let field_value_opt =
475 object
476 .get::<Option<$field_value_ty>>(field_name)
477 .map_err(|e| {
478 DataFusionError::Execution(format!(
479 "Failed to get {} field value for {:?}: {e:?}",
480 stringify!($field_value_ty),
481 $field,
482 ))
483 })?;
484 match field_value_opt {
485 Some(field_value) => {
486 field_builder.append_value($convert(field_value)?);
487 }
488 None => {
489 field_builder.append_null();
490 }
491 }
492 }
493 None => {
494 field_builder.append_null();
495 }
496 }
497 }};
498}
499
500fn append_object_to_struct_builder(
501 builder: &mut StructBuilder,
502 fields: &Fields,
503 object_opt: Option<Object>,
504) -> DFResult<()> {
505 for (idx, field) in fields.iter().enumerate() {
506 match field.data_type() {
507 DataType::Int32 => {
508 append_object_attr!(
509 builder,
510 Int32Builder,
511 field,
512 idx,
513 object_opt,
514 i32,
515 just_return
516 );
517 }
518 DataType::Int64 => {
519 append_object_attr!(
520 builder,
521 Int64Builder,
522 field,
523 idx,
524 object_opt,
525 i64,
526 just_return
527 );
528 }
529 DataType::Float32 => {
530 append_object_attr!(
531 builder,
532 Float32Builder,
533 field,
534 idx,
535 object_opt,
536 f32,
537 just_return
538 );
539 }
540 DataType::Float64 => {
541 append_object_attr!(
542 builder,
543 Float64Builder,
544 field,
545 idx,
546 object_opt,
547 f64,
548 just_return
549 );
550 }
551 DataType::Decimal128(_precision, scale) => {
552 append_object_attr!(
553 builder,
554 Decimal128Builder,
555 field,
556 idx,
557 object_opt,
558 String,
559 |v: String| {
560 let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
561 DataFusionError::Execution(format!(
562 "Failed to parse BigDecimal from {v:?}: {e:?}",
563 ))
564 })?;
565 big_decimal_to_i128(&decimal, Some(*scale as i32))
566 }
567 );
568 }
569 DataType::Binary => {
570 append_object_attr!(
571 builder,
572 BinaryBuilder,
573 field,
574 idx,
575 object_opt,
576 Vec<u8>,
577 just_return
578 );
579 }
580 DataType::LargeBinary => {
581 append_object_attr!(
582 builder,
583 LargeBinaryBuilder,
584 field,
585 idx,
586 object_opt,
587 Vec<u8>,
588 just_return
589 );
590 }
591 DataType::Struct(fields) => {
592 let field_builder =
593 builder.field_builder::<StructBuilder>(idx).ok_or_else(|| {
594 DataFusionError::Execution(format!(
595 "Failed to get struct field builder for {field:?}"
596 ))
597 })?;
598 match &object_opt {
599 Some(object) => {
600 let field_value =
601 object.get::<Option<Object>>(field.name()).map_err(|e| {
602 DataFusionError::Execution(format!(
603 "Failed to get object value for {field:?}: {e:?}"
604 ))
605 })?;
606 append_object_to_struct_builder(field_builder, fields, field_value)?;
607 }
608 None => {
609 field_builder.append_null();
610 }
611 }
612 }
613 _ => {
614 return Err(DataFusionError::NotImplemented(format!(
615 "Unsupported struct field type {}",
616 field.data_type(),
617 )));
618 }
619 }
620 }
621
622 match &object_opt {
623 Some(_) => {
624 builder.append(true);
625 }
626 None => {
627 builder.append_null();
628 }
629 }
630 Ok(())
631}