1use crate::connection::{RemoteDbType, just_return, projections_contains};
2use crate::utils::big_decimal_to_i128;
3use crate::{
4 Connection, ConnectionOptions, DFResult, Literalize, OracleConnectionOptions, OracleType, Pool,
5 RemoteField, RemoteSchema, RemoteSchemaRef, RemoteSource, RemoteType,
6};
7use bb8_oracle::OracleConnectionManager;
8use datafusion::arrow::array::{
9 ArrayRef, BinaryBuilder, BooleanBuilder, Date64Builder, Decimal128Builder, Float32Builder,
10 Float64Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
11 LargeStringBuilder, RecordBatch, RecordBatchOptions, StringBuilder, StructBuilder,
12 TimestampNanosecondBuilder, TimestampSecondBuilder, make_builder,
13};
14use datafusion::arrow::datatypes::{DataType, Fields, SchemaRef, TimeUnit};
15use datafusion::common::{DataFusionError, project_schema};
16use datafusion::execution::SendableRecordBatchStream;
17use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
18use futures::StreamExt;
19use log::debug;
20use oracle::sql_type::{Object, OracleType as ColumnType};
21use oracle::{Connector, Row};
22use std::any::Any;
23use std::sync::Arc;
24
25#[derive(Debug)]
26pub struct OraclePool {
27 pool: bb8::Pool<OracleConnectionManager>,
28}
29
30pub(crate) async fn connect_oracle(options: &OracleConnectionOptions) -> DFResult<OraclePool> {
31 let connect_string = format!(
32 "//{}:{}/{}",
33 options.host, options.port, options.service_name
34 );
35 let connector = Connector::new(
36 options.username.clone(),
37 options.password.clone(),
38 connect_string,
39 );
40 let _ = connector
41 .connect()
42 .map_err(|e| DataFusionError::Internal(format!("Failed to connect to oracle: {e:?}")))?;
43 let manager = OracleConnectionManager::from_connector(connector);
44 let pool = bb8::Pool::builder()
45 .max_size(options.pool_max_size as u32)
46 .build(manager)
47 .await
48 .map_err(|e| DataFusionError::Internal(format!("Failed to create oracle pool: {e:?}")))?;
49 Ok(OraclePool { pool })
50}
51
52#[async_trait::async_trait]
53impl Pool for OraclePool {
54 async fn get(&self) -> DFResult<Arc<dyn Connection>> {
55 let conn = self.pool.get_owned().await.map_err(|e| {
56 DataFusionError::Execution(format!("Failed to get oracle connection due to {e:?}"))
57 })?;
58 Ok(Arc::new(OracleConnection { conn }))
59 }
60}
61
62#[derive(Debug)]
63pub struct OracleConnection {
64 conn: bb8::PooledConnection<'static, OracleConnectionManager>,
65}
66
67#[async_trait::async_trait]
68impl Connection for OracleConnection {
69 fn as_any(&self) -> &dyn Any {
70 self
71 }
72
73 async fn infer_schema(&self, source: &RemoteSource) -> DFResult<RemoteSchemaRef> {
74 let sql = RemoteDbType::Oracle.limit_1_query_if_possible(source);
75 let result_set = self.conn.query(&sql, &[]).map_err(|e| {
76 DataFusionError::Plan(format!("Failed to execute query {sql} on oracle: {e:?}"))
77 })?;
78 let remote_schema = Arc::new(build_remote_schema(&result_set)?);
79 Ok(remote_schema)
80 }
81
82 async fn query(
83 &self,
84 conn_options: &ConnectionOptions,
85 source: &RemoteSource,
86 table_schema: SchemaRef,
87 projection: Option<&Vec<usize>>,
88 unparsed_filters: &[String],
89 limit: Option<usize>,
90 ) -> DFResult<SendableRecordBatchStream> {
91 let projected_schema = project_schema(&table_schema, projection)?;
92
93 let sql = RemoteDbType::Oracle.rewrite_query(source, unparsed_filters, limit);
94 debug!("[remote-table] executing oracle query: {sql}");
95
96 let projection = projection.cloned();
97 let chunk_size = conn_options.stream_chunk_size();
98 let result_set = self.conn.query(&sql, &[]).map_err(|e| {
99 DataFusionError::Execution(format!("Failed to execute query on oracle: {e:?}"))
100 })?;
101 let stream = futures::stream::iter(result_set).chunks(chunk_size).boxed();
102
103 let stream = stream.map(move |rows| {
104 let rows: Vec<Row> = rows
105 .into_iter()
106 .collect::<Result<Vec<_>, _>>()
107 .map_err(|e| {
108 DataFusionError::Execution(format!(
109 "Failed to collect rows from oracle due to {e}",
110 ))
111 })?;
112 rows_to_batch(rows.as_slice(), &table_schema, projection.as_ref())
113 });
114
115 Ok(Box::pin(RecordBatchStreamAdapter::new(
116 projected_schema,
117 stream,
118 )))
119 }
120
121 async fn insert(
122 &self,
123 _conn_options: &ConnectionOptions,
124 _literalizer: Arc<dyn Literalize>,
125 _table: &[String],
126 _remote_schema: RemoteSchemaRef,
127 _input: SendableRecordBatchStream,
128 ) -> DFResult<usize> {
129 Err(DataFusionError::Execution(
130 "Insert operation is not supported for oracle".to_string(),
131 ))
132 }
133}
134
135fn oracle_type_to_remote_type(oracle_type: &ColumnType) -> DFResult<OracleType> {
136 match oracle_type {
137 ColumnType::Number(precision, scale) => {
138 let precision = if *precision == 0 { 38 } else { *precision };
140 let scale = if *scale == -127 { 0 } else { *scale };
141 Ok(OracleType::Number(precision, scale))
142 }
143 ColumnType::BinaryFloat => Ok(OracleType::BinaryFloat),
144 ColumnType::BinaryDouble => Ok(OracleType::BinaryDouble),
145 ColumnType::Float(precision) => Ok(OracleType::Float(*precision)),
146 ColumnType::Varchar2(size) => Ok(OracleType::Varchar2(*size)),
147 ColumnType::NVarchar2(size) => Ok(OracleType::NVarchar2(*size)),
148 ColumnType::Char(size) => Ok(OracleType::Char(*size)),
149 ColumnType::NChar(size) => Ok(OracleType::NChar(*size)),
150 ColumnType::Long => Ok(OracleType::Long),
151 ColumnType::CLOB => Ok(OracleType::Clob),
152 ColumnType::NCLOB => Ok(OracleType::NClob),
153 ColumnType::Raw(size) => Ok(OracleType::Raw(*size)),
154 ColumnType::LongRaw => Ok(OracleType::LongRaw),
155 ColumnType::BLOB => Ok(OracleType::Blob),
156 ColumnType::Date => Ok(OracleType::Date),
157 ColumnType::Timestamp(_) => Ok(OracleType::Timestamp),
158 ColumnType::Boolean => Ok(OracleType::Boolean),
159 ColumnType::Object(_) => Ok(OracleType::SdeGeometry),
160 _ => Err(DataFusionError::NotImplemented(format!(
161 "Unsupported oracle type: {oracle_type:?}",
162 ))),
163 }
164}
165
166fn build_remote_schema(result_set: &oracle::ResultSet<Row>) -> DFResult<RemoteSchema> {
167 let mut remote_fields = vec![];
168 for col in result_set.column_info() {
169 let remote_type = RemoteType::Oracle(oracle_type_to_remote_type(col.oracle_type())?);
170 remote_fields.push(RemoteField::new(col.name(), remote_type, col.nullable()));
171 }
172 Ok(RemoteSchema::new(remote_fields))
173}
174
175macro_rules! handle_primitive_type {
176 ($builder:expr, $field:expr, $col:expr, $builder_ty:ty, $value_ty:ty, $row:expr, $index:expr, $convert:expr) => {{
177 let builder = $builder
178 .as_any_mut()
179 .downcast_mut::<$builder_ty>()
180 .unwrap_or_else(|| {
181 panic!(
182 "Failed to downcast builder to {} for {:?} and {:?}",
183 stringify!($builder_ty),
184 $field,
185 $col
186 )
187 });
188 let v = $row.get::<usize, Option<$value_ty>>($index).map_err(|e| {
189 DataFusionError::Execution(format!(
190 "Failed to get {} value for {:?} and {:?}: {e:?}",
191 stringify!($value_ty),
192 $field,
193 $col
194 ))
195 })?;
196
197 match v {
198 Some(v) => builder.append_value($convert(v)?),
199 None => builder.append_null(),
200 }
201 }};
202}
203
204fn rows_to_batch(
205 rows: &[Row],
206 table_schema: &SchemaRef,
207 projection: Option<&Vec<usize>>,
208) -> DFResult<RecordBatch> {
209 let projected_schema = project_schema(table_schema, projection)?;
210 let mut array_builders = vec![];
211 for field in table_schema.fields() {
212 let builder = make_builder(field.data_type(), rows.len());
213 array_builders.push(builder);
214 }
215
216 for row in rows {
217 for (idx, field) in table_schema.fields.iter().enumerate() {
218 if !projections_contains(projection, idx) {
219 continue;
220 }
221 let builder = &mut array_builders[idx];
222 let col = row.column_info().get(idx);
223 match field.data_type() {
224 DataType::Int16 => {
225 handle_primitive_type!(
226 builder,
227 field,
228 col,
229 Int16Builder,
230 i16,
231 row,
232 idx,
233 just_return
234 );
235 }
236 DataType::Int32 => {
237 handle_primitive_type!(
238 builder,
239 field,
240 col,
241 Int32Builder,
242 i32,
243 row,
244 idx,
245 just_return
246 );
247 }
248 DataType::Int64 => {
249 handle_primitive_type!(
250 builder,
251 field,
252 col,
253 Int64Builder,
254 i64,
255 row,
256 idx,
257 just_return
258 );
259 }
260 DataType::Float32 => {
261 handle_primitive_type!(
262 builder,
263 field,
264 col,
265 Float32Builder,
266 f32,
267 row,
268 idx,
269 just_return
270 );
271 }
272 DataType::Float64 => {
273 handle_primitive_type!(
274 builder,
275 field,
276 col,
277 Float64Builder,
278 f64,
279 row,
280 idx,
281 just_return
282 );
283 }
284 DataType::Utf8 => {
285 handle_primitive_type!(
286 builder,
287 field,
288 col,
289 StringBuilder,
290 String,
291 row,
292 idx,
293 just_return
294 );
295 }
296 DataType::LargeUtf8 => {
297 handle_primitive_type!(
298 builder,
299 field,
300 col,
301 LargeStringBuilder,
302 String,
303 row,
304 idx,
305 just_return
306 );
307 }
308 DataType::Decimal128(_precision, scale) => {
309 handle_primitive_type!(
310 builder,
311 field,
312 col,
313 Decimal128Builder,
314 String,
315 row,
316 idx,
317 |v: String| {
318 let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
319 DataFusionError::Execution(format!(
320 "Failed to parse BigDecimal from {v:?}: {e:?}",
321 ))
322 })?;
323 big_decimal_to_i128(&decimal, Some(*scale as i32))
324 }
325 );
326 }
327 DataType::Timestamp(TimeUnit::Second, None) => {
328 handle_primitive_type!(
329 builder,
330 field,
331 col,
332 TimestampSecondBuilder,
333 chrono::NaiveDateTime,
334 row,
335 idx,
336 |v: chrono::NaiveDateTime| {
337 let t = v.and_utc().timestamp();
338 Ok::<_, DataFusionError>(t)
339 }
340 );
341 }
342 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
343 handle_primitive_type!(
344 builder,
345 field,
346 col,
347 TimestampNanosecondBuilder,
348 chrono::NaiveDateTime,
349 row,
350 idx,
351 |v: chrono::NaiveDateTime| {
352 v.and_utc().timestamp_nanos_opt().ok_or_else(|| {
353 DataFusionError::Execution(format!(
354 "Failed to convert chrono::NaiveDateTime {v} to nanos timestamp"
355 ))
356 })
357 }
358 );
359 }
360 DataType::Date64 => {
361 handle_primitive_type!(
362 builder,
363 field,
364 col,
365 Date64Builder,
366 chrono::NaiveDateTime,
367 row,
368 idx,
369 |v: chrono::NaiveDateTime| {
370 Ok::<_, DataFusionError>(v.and_utc().timestamp_millis())
371 }
372 );
373 }
374 DataType::Boolean => {
375 handle_primitive_type!(
376 builder,
377 field,
378 col,
379 BooleanBuilder,
380 bool,
381 row,
382 idx,
383 just_return
384 );
385 }
386 DataType::Binary => {
387 handle_primitive_type!(
388 builder,
389 field,
390 col,
391 BinaryBuilder,
392 Vec<u8>,
393 row,
394 idx,
395 just_return
396 );
397 }
398 DataType::LargeBinary => {
399 handle_primitive_type!(
400 builder,
401 field,
402 col,
403 LargeBinaryBuilder,
404 Vec<u8>,
405 row,
406 idx,
407 just_return
408 );
409 }
410 DataType::Struct(fields) => {
411 let builder = builder
412 .as_any_mut()
413 .downcast_mut::<StructBuilder>()
414 .unwrap_or_else(|| {
415 panic!("Failed to downcast builder to StructBuilder for {field:?} and {col:?}")
416 });
417 let object = row.get::<usize, Option<Object>>(idx).map_err(|e| {
418 DataFusionError::Execution(format!(
419 "Failed to get object for {field:?} and {col:?}: {e:?}"
420 ))
421 })?;
422 append_object_to_struct_builder(builder, fields, object)?;
423 }
424 _ => {
425 return Err(DataFusionError::NotImplemented(format!(
426 "Unsupported data type {:?} for col: {:?}",
427 field.data_type(),
428 col
429 )));
430 }
431 }
432 }
433 }
434
435 let projected_columns = array_builders
436 .into_iter()
437 .enumerate()
438 .filter(|(idx, _)| projections_contains(projection, *idx))
439 .map(|(_, mut builder)| builder.finish())
440 .collect::<Vec<ArrayRef>>();
441 let options = RecordBatchOptions::new().with_row_count(Some(rows.len()));
442 Ok(RecordBatch::try_new_with_options(
443 projected_schema,
444 projected_columns,
445 &options,
446 )?)
447}
448
449macro_rules! append_object_attr {
450 ($struct_builder:expr, $field_builder_type:ty, $field:expr, $field_idx:expr, $object_opt:expr, $field_value_ty:ty, $convert:expr) => {{
451 let field_builder = $struct_builder
452 .field_builder::<$field_builder_type>($field_idx)
453 .ok_or_else(|| {
454 DataFusionError::Execution(format!(
455 "Failed to get {} field builder for {:?}",
456 stringify!($field_builder_type),
457 $field,
458 ))
459 })?;
460 match &$object_opt {
461 Some(object) => {
462 let field_name = $field.name();
463 let field_value_opt =
464 object
465 .get::<Option<$field_value_ty>>(field_name)
466 .map_err(|e| {
467 DataFusionError::Execution(format!(
468 "Failed to get {} field value for {:?}: {e:?}",
469 stringify!($field_value_ty),
470 $field,
471 ))
472 })?;
473 match field_value_opt {
474 Some(field_value) => {
475 field_builder.append_value($convert(field_value)?);
476 }
477 None => {
478 field_builder.append_null();
479 }
480 }
481 }
482 None => {
483 field_builder.append_null();
484 }
485 }
486 }};
487}
488
489fn append_object_to_struct_builder(
490 builder: &mut StructBuilder,
491 fields: &Fields,
492 object_opt: Option<Object>,
493) -> DFResult<()> {
494 for (idx, field) in fields.iter().enumerate() {
495 match field.data_type() {
496 DataType::Int32 => {
497 append_object_attr!(
498 builder,
499 Int32Builder,
500 field,
501 idx,
502 object_opt,
503 i32,
504 just_return
505 );
506 }
507 DataType::Int64 => {
508 append_object_attr!(
509 builder,
510 Int64Builder,
511 field,
512 idx,
513 object_opt,
514 i64,
515 just_return
516 );
517 }
518 DataType::Float32 => {
519 append_object_attr!(
520 builder,
521 Float32Builder,
522 field,
523 idx,
524 object_opt,
525 f32,
526 just_return
527 );
528 }
529 DataType::Float64 => {
530 append_object_attr!(
531 builder,
532 Float64Builder,
533 field,
534 idx,
535 object_opt,
536 f64,
537 just_return
538 );
539 }
540 DataType::Decimal128(_precision, scale) => {
541 append_object_attr!(
542 builder,
543 Decimal128Builder,
544 field,
545 idx,
546 object_opt,
547 String,
548 |v: String| {
549 let decimal = v.parse::<bigdecimal::BigDecimal>().map_err(|e| {
550 DataFusionError::Execution(format!(
551 "Failed to parse BigDecimal from {v:?}: {e:?}",
552 ))
553 })?;
554 big_decimal_to_i128(&decimal, Some(*scale as i32))
555 }
556 );
557 }
558 DataType::Binary => {
559 append_object_attr!(
560 builder,
561 BinaryBuilder,
562 field,
563 idx,
564 object_opt,
565 Vec<u8>,
566 just_return
567 );
568 }
569 DataType::LargeBinary => {
570 append_object_attr!(
571 builder,
572 LargeBinaryBuilder,
573 field,
574 idx,
575 object_opt,
576 Vec<u8>,
577 just_return
578 );
579 }
580 DataType::Struct(fields) => {
581 let field_builder =
582 builder.field_builder::<StructBuilder>(idx).ok_or_else(|| {
583 DataFusionError::Execution(format!(
584 "Failed to get struct field builder for {field:?}"
585 ))
586 })?;
587 match &object_opt {
588 Some(object) => {
589 let field_value =
590 object.get::<Option<Object>>(field.name()).map_err(|e| {
591 DataFusionError::Execution(format!(
592 "Failed to get object value for {field:?}: {e:?}"
593 ))
594 })?;
595 append_object_to_struct_builder(field_builder, fields, field_value)?;
596 }
597 None => {
598 field_builder.append_null();
599 }
600 }
601 }
602 _ => {
603 return Err(DataFusionError::NotImplemented(format!(
604 "Unsupported struct field type {}",
605 field.data_type(),
606 )));
607 }
608 }
609 }
610
611 match &object_opt {
612 Some(_) => {
613 builder.append(true);
614 }
615 None => {
616 builder.append_null();
617 }
618 }
619 Ok(())
620}