1use crate::error::{ConversionError, QueryError};
7use crate::transport::messages::{ColumnInfo, ResultData, ResultSetHandle};
8use crate::transport::protocol::QueryResult as TransportQueryResult;
9use crate::transport::TransportProtocol;
10use crate::types::TypeMapper;
11use arrow::array::RecordBatch;
12use arrow::datatypes::{Field, Schema};
13use std::sync::Arc;
14use tokio::sync::Mutex;
15
16#[derive(Debug, Clone)]
18pub struct QueryMetadata {
19 pub schema: Arc<Schema>,
21 pub total_rows: Option<i64>,
23 pub column_count: usize,
25 pub execution_time_ms: Option<u64>,
27}
28
29impl QueryMetadata {
30 pub fn new(schema: Arc<Schema>, total_rows: Option<i64>) -> Self {
32 Self {
33 column_count: schema.fields().len(),
34 schema,
35 total_rows,
36 execution_time_ms: None,
37 }
38 }
39
40 pub fn with_execution_time(mut self, execution_time_ms: u64) -> Self {
42 self.execution_time_ms = Some(execution_time_ms);
43 self
44 }
45
46 pub fn column_names(&self) -> Vec<&str> {
48 self.schema
49 .fields()
50 .iter()
51 .map(|f| f.name().as_str())
52 .collect()
53 }
54
55 pub fn column_types(&self) -> Vec<&arrow::datatypes::DataType> {
57 self.schema.fields().iter().map(|f| f.data_type()).collect()
58 }
59}
60
61pub struct ResultSet {
63 inner: ResultSetInner,
65 transport: Arc<Mutex<dyn TransportProtocol>>,
67}
68
69impl std::fmt::Debug for ResultSet {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("ResultSet")
72 .field("inner", &self.inner)
73 .field("transport", &"<TransportProtocol>")
74 .finish()
75 }
76}
77
78#[derive(Debug)]
79enum ResultSetInner {
80 RowCount { count: i64 },
82 Stream {
84 handle: Option<ResultSetHandle>,
86 metadata: QueryMetadata,
88 batches: Vec<RecordBatch>,
90 complete: bool,
92 },
93}
94
95impl ResultSet {
96 pub(crate) fn from_transport_result(
98 result: TransportQueryResult,
99 transport: Arc<Mutex<dyn TransportProtocol>>,
100 ) -> Result<Self, QueryError> {
101 match result {
102 TransportQueryResult::RowCount { count } => Ok(Self {
103 inner: ResultSetInner::RowCount { count },
104 transport,
105 }),
106 TransportQueryResult::ResultSet { handle, data } => {
107 let schema = Self::build_schema(&data.columns)
108 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
109
110 let metadata = QueryMetadata::new(Arc::clone(&schema), Some(data.total_rows));
111
112 let batches = if !data.data.is_empty() {
115 vec![Self::column_major_to_record_batch(&data, &schema)
116 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?]
117 } else {
118 Vec::new()
119 };
120
121 let num_rows_received = if data.data.is_empty() {
124 0
125 } else {
126 data.data[0].len() as i64
127 };
128 let complete = handle.is_none() || data.total_rows == num_rows_received;
129
130 Ok(Self {
131 inner: ResultSetInner::Stream {
132 handle, metadata,
134 batches,
135 complete,
136 },
137 transport,
138 })
139 }
140 }
141 }
142
143 pub fn row_count(&self) -> Option<i64> {
145 match &self.inner {
146 ResultSetInner::RowCount { count } => Some(*count),
147 _ => None,
148 }
149 }
150
151 pub fn metadata(&self) -> Option<&QueryMetadata> {
153 match &self.inner {
154 ResultSetInner::Stream { metadata, .. } => Some(metadata),
155 _ => None,
156 }
157 }
158
159 pub fn is_stream(&self) -> bool {
161 matches!(&self.inner, ResultSetInner::Stream { .. })
162 }
163
164 pub fn into_iterator(self) -> Result<ResultSetIterator, QueryError> {
169 match self.inner {
170 ResultSetInner::Stream {
171 handle,
172 metadata,
173 batches,
174 complete,
175 } => Ok(ResultSetIterator {
176 handle,
177 transport: self.transport,
178 metadata,
179 batches,
180 current_index: 0,
181 complete,
182 }),
183 ResultSetInner::RowCount { .. } => Err(QueryError::NoResultSet(
184 "Cannot iterate over row count result".to_string(),
185 )),
186 }
187 }
188
189 pub async fn fetch_all(mut self) -> Result<Vec<RecordBatch>, QueryError> {
194 match &mut self.inner {
195 ResultSetInner::Stream {
196 handle,
197 metadata,
198 batches,
199 complete,
200 } => {
201 let all_batches = if *complete {
202 batches.clone()
203 } else {
204 let mut all_batches = batches.clone();
205
206 if let Some(handle_val) = handle {
208 loop {
209 let mut transport = self.transport.lock().await;
210 let result_data = transport
211 .fetch_results(*handle_val)
212 .await
213 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
214
215 if result_data.data.is_empty() {
216 *complete = true;
217 break;
218 }
219
220 let batch =
221 Self::column_major_to_record_batch(&result_data, &metadata.schema)
222 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
223
224 all_batches.push(batch);
225
226 if result_data.total_rows > 0
228 && all_batches.iter().map(|b| b.num_rows()).sum::<usize>()
229 >= result_data.total_rows as usize
230 {
231 *complete = true;
232 break;
233 }
234 }
235 }
236
237 *batches = all_batches.clone();
238 all_batches
239 };
240
241 if let Some(handle_val) = handle.take() {
243 let mut transport = self.transport.lock().await;
244 let _ = transport.close_result_set(handle_val).await;
246 }
247
248 Ok(all_batches)
249 }
250 ResultSetInner::RowCount { .. } => Err(QueryError::NoResultSet(
251 "Cannot fetch batches from row count result".to_string(),
252 )),
253 }
254 }
255
256 fn build_schema(columns: &[ColumnInfo]) -> Result<Arc<Schema>, ConversionError> {
258 let fields: Result<Vec<Field>, ConversionError> = columns
259 .iter()
260 .map(|col| {
261 let arrow_type = Self::exasol_datatype_to_arrow(&col.data_type)?;
263
264 Ok(Field::new(&col.name, arrow_type, true))
266 })
267 .collect();
268
269 Ok(Arc::new(Schema::new(fields?)))
270 }
271
272 fn exasol_datatype_to_arrow(
274 data_type: &crate::transport::messages::DataType,
275 ) -> Result<arrow::datatypes::DataType, ConversionError> {
276 use crate::types::ExasolType;
277
278 let exasol_type = match data_type.type_name.as_str() {
279 "BOOLEAN" => ExasolType::Boolean,
280 "CHAR" => ExasolType::Char {
281 size: data_type.size.unwrap_or(1) as usize,
282 },
283 "VARCHAR" => ExasolType::Varchar {
284 size: data_type.size.unwrap_or(2000000) as usize,
285 },
286 "DECIMAL" => ExasolType::Decimal {
287 precision: data_type.precision.unwrap_or(18) as u8,
288 scale: data_type.scale.unwrap_or(0) as i8,
289 },
290 "DOUBLE" => ExasolType::Double,
291 "DATE" => ExasolType::Date,
292 "TIMESTAMP" => ExasolType::Timestamp {
293 with_local_time_zone: data_type.with_local_time_zone.unwrap_or(false),
294 },
295 "INTERVAL YEAR TO MONTH" => ExasolType::IntervalYearToMonth,
296 "INTERVAL DAY TO SECOND" => ExasolType::IntervalDayToSecond {
297 precision: data_type.fraction.unwrap_or(3) as u8,
298 },
299 "GEOMETRY" => ExasolType::Geometry { srid: None },
300 "HASHTYPE" => ExasolType::Hashtype { byte_size: 16 },
301 _ => {
302 return Err(ConversionError::UnsupportedType {
303 exasol_type: data_type.type_name.clone(),
304 })
305 }
306 };
307
308 TypeMapper::exasol_to_arrow(&exasol_type, true)
309 }
310
311 fn column_major_to_record_batch(
318 data: &ResultData,
319 schema: &Arc<Schema>,
320 ) -> Result<RecordBatch, ConversionError> {
321 use arrow::array::*;
322 use serde_json::Value;
323
324 if data.data.is_empty() {
325 let empty_arrays: Vec<Arc<dyn Array>> = schema
327 .fields()
328 .iter()
329 .map(|field| new_empty_array(field.data_type()))
330 .collect();
331
332 return RecordBatch::try_new(Arc::clone(schema), empty_arrays)
333 .map_err(|e| ConversionError::ArrowError(e.to_string()));
334 }
335
336 let num_columns = schema.fields().len();
338 let column_values: Vec<Vec<&Value>> = (0..num_columns)
339 .map(|col_idx| {
340 data.data
341 .iter()
342 .map(|row| row.get(col_idx).unwrap_or(&Value::Null))
343 .collect()
344 })
345 .collect();
346
347 let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
348
349 for (col_idx, field) in schema.fields().iter().enumerate() {
350 use arrow::datatypes::DataType;
351
352 let col_values = &column_values[col_idx];
354
355 let array: Arc<dyn Array> = match field.data_type() {
357 DataType::Boolean => {
358 let mut builder = BooleanBuilder::new();
359 for value in col_values {
360 if value.is_null() {
361 builder.append_null();
362 } else if let Some(b) = value.as_bool() {
363 builder.append_value(b);
364 } else {
365 builder.append_null();
366 }
367 }
368 Arc::new(builder.finish())
369 }
370 DataType::Int32 => {
371 let mut builder = Int32Builder::new();
372 for value in col_values {
373 if value.is_null() {
374 builder.append_null();
375 } else if let Some(i) = value.as_i64() {
376 builder.append_value(i as i32);
377 } else {
378 builder.append_null();
379 }
380 }
381 Arc::new(builder.finish())
382 }
383 DataType::Int64 => {
384 let mut builder = Int64Builder::new();
385 for value in col_values {
386 if value.is_null() {
387 builder.append_null();
388 } else if let Some(i) = value.as_i64() {
389 builder.append_value(i);
390 } else {
391 builder.append_null();
392 }
393 }
394 Arc::new(builder.finish())
395 }
396 DataType::Float64 => {
397 let mut builder = Float64Builder::new();
398 for value in col_values {
399 if value.is_null() {
400 builder.append_null();
401 } else if let Some(f) = value.as_f64() {
402 builder.append_value(f);
403 } else {
404 builder.append_null();
405 }
406 }
407 Arc::new(builder.finish())
408 }
409 DataType::Utf8 => {
410 let mut builder = StringBuilder::new();
411 for value in col_values {
412 if value.is_null() {
413 builder.append_null();
414 } else if let Some(s) = value.as_str() {
415 builder.append_value(s);
416 } else {
417 builder.append_value(value.to_string());
419 }
420 }
421 Arc::new(builder.finish())
422 }
423 DataType::Decimal128(precision, scale) => {
424 let mut builder = Decimal128Builder::new()
425 .with_precision_and_scale(*precision, *scale)
426 .map_err(|e| ConversionError::ArrowError(e.to_string()))?;
427
428 for value in col_values {
429 if value.is_null() {
430 builder.append_null();
431 } else if let Some(s) = value.as_str() {
432 let scaled = Self::parse_string_to_decimal(s, *scale)?;
434 builder.append_value(scaled);
435 } else if let Some(i) = value.as_i64() {
436 let scaled = i * 10i64.pow(*scale as u32);
438 builder.append_value(scaled as i128);
439 } else if let Some(f) = value.as_f64() {
440 let scaled = (f * 10f64.powi(*scale as i32)) as i128;
441 builder.append_value(scaled);
442 } else {
443 builder.append_null();
444 }
445 }
446 Arc::new(builder.finish())
447 }
448 DataType::Date32 => {
449 let mut builder = Date32Builder::new();
450 for value in col_values {
451 if value.is_null() {
452 builder.append_null();
453 } else if let Some(s) = value.as_str() {
454 match Self::parse_date_to_days(s) {
456 Ok(days) => builder.append_value(days),
457 Err(_) => builder.append_null(),
458 }
459 } else {
460 builder.append_null();
461 }
462 }
463 Arc::new(builder.finish())
464 }
465 DataType::Timestamp(_, _) => {
466 let mut builder = TimestampMicrosecondBuilder::new();
467 for value in col_values {
468 if value.is_null() {
469 builder.append_null();
470 } else if let Some(s) = value.as_str() {
471 match Self::parse_timestamp_to_micros(s) {
473 Ok(micros) => builder.append_value(micros),
474 Err(_) => builder.append_null(),
475 }
476 } else {
477 builder.append_null();
478 }
479 }
480 Arc::new(builder.finish())
481 }
482 _ => {
483 let mut builder = StringBuilder::new();
485 for value in col_values {
486 if value.is_null() {
487 builder.append_null();
488 } else {
489 builder.append_value(value.to_string());
490 }
491 }
492 Arc::new(builder.finish())
493 }
494 };
495
496 arrays.push(array);
497 }
498
499 RecordBatch::try_new(Arc::clone(schema), arrays)
500 .map_err(|e| ConversionError::ArrowError(e.to_string()))
501 }
502
503 fn parse_date_to_days(date_str: &str) -> Result<i32, ()> {
505 let parts: Vec<&str> = date_str.split('-').collect();
506 if parts.len() != 3 {
507 return Err(());
508 }
509
510 let year: i32 = parts[0].parse().map_err(|_| ())?;
511 let month: u32 = parts[1].parse().map_err(|_| ())?;
512 let day: u32 = parts[2].parse().map_err(|_| ())?;
513
514 if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
515 return Err(());
516 }
517
518 let days_from_year =
520 (year - 1970) * 365 + (year - 1969) / 4 - (year - 1901) / 100 + (year - 1601) / 400;
521 let days_from_month = match month {
522 1 => 0,
523 2 => 31,
524 3 => 59,
525 4 => 90,
526 5 => 120,
527 6 => 151,
528 7 => 181,
529 8 => 212,
530 9 => 243,
531 10 => 273,
532 11 => 304,
533 12 => 334,
534 _ => return Err(()),
535 };
536
537 let is_leap_year = (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0);
539 let leap_adjustment = if month > 2 && is_leap_year { 1 } else { 0 };
540
541 Ok(days_from_year + days_from_month + day as i32 - 1 + leap_adjustment)
542 }
543
544 fn parse_timestamp_to_micros(timestamp_str: &str) -> Result<i64, ()> {
546 let parts: Vec<&str> = timestamp_str.split(' ').collect();
548 if parts.is_empty() {
549 return Err(());
550 }
551
552 let days = Self::parse_date_to_days(parts[0])?;
554 let mut micros = days as i64 * 86400 * 1_000_000;
555
556 if parts.len() > 1 {
558 let time_parts: Vec<&str> = parts[1].split(':').collect();
559 if time_parts.len() >= 2 {
560 let hours: i64 = time_parts[0].parse().map_err(|_| ())?;
561 let minutes: i64 = time_parts[1].parse().map_err(|_| ())?;
562
563 micros += hours * 3600 * 1_000_000;
564 micros += minutes * 60 * 1_000_000;
565
566 if time_parts.len() >= 3 {
567 let sec_parts: Vec<&str> = time_parts[2].split('.').collect();
569 let seconds: i64 = sec_parts[0].parse().map_err(|_| ())?;
570
571 micros += seconds * 1_000_000;
572
573 if sec_parts.len() > 1 {
574 let frac = sec_parts[1];
576 let frac_micros = if frac.len() <= 6 {
577 let padding = 6 - frac.len();
578 let padded = format!("{}{}", frac, "0".repeat(padding));
579 padded.parse::<i64>().unwrap_or(0)
580 } else {
581 frac[..6].parse::<i64>().unwrap_or(0)
582 };
583 micros += frac_micros;
584 }
585 }
586 }
587 }
588
589 Ok(micros)
590 }
591
592 fn parse_string_to_decimal(s: &str, scale: i8) -> Result<i128, ConversionError> {
596 if s.is_empty() {
598 return Err(ConversionError::InvalidFormat(
599 "Empty decimal string".to_string(),
600 ));
601 }
602
603 let parts: Vec<&str> = s.split('.').collect();
605
606 let (integer_part, decimal_part) = match parts.len() {
607 1 => (parts[0], ""),
608 2 => (parts[0], parts[1]),
609 _ => {
610 return Err(ConversionError::InvalidFormat(format!(
611 "Invalid decimal format: {}",
612 s
613 )));
614 }
615 };
616
617 let mut result: i128 = integer_part.parse().map_err(|_| {
619 ConversionError::InvalidFormat(format!("Invalid integer part: {}", integer_part))
620 })?;
621
622 result = result
624 .checked_mul(10_i128.pow(scale as u32))
625 .ok_or_else(|| ConversionError::InvalidFormat("Decimal overflow".to_string()))?;
626
627 if !decimal_part.is_empty() {
629 let decimal_digits = decimal_part.len().min(scale as usize);
630 let decimal_value: i128 = decimal_part[..decimal_digits].parse().map_err(|_| {
631 ConversionError::InvalidFormat(format!("Invalid decimal part: {}", decimal_part))
632 })?;
633
634 let scale_diff = scale as usize - decimal_digits;
636 let scaled_decimal = decimal_value * 10_i128.pow(scale_diff as u32);
637
638 result = result
639 .checked_add(if integer_part.starts_with('-') {
640 -scaled_decimal
641 } else {
642 scaled_decimal
643 })
644 .ok_or_else(|| ConversionError::InvalidFormat("Decimal overflow".to_string()))?;
645 }
646
647 Ok(result)
648 }
649}
650
651pub struct ResultSetIterator {
655 handle: Option<ResultSetHandle>,
657 transport: Arc<Mutex<dyn TransportProtocol>>,
659 metadata: QueryMetadata,
661 batches: Vec<RecordBatch>,
663 current_index: usize,
665 complete: bool,
667}
668
669impl ResultSetIterator {
670 pub fn metadata(&self) -> &QueryMetadata {
672 &self.metadata
673 }
674
675 async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>, QueryError> {
677 if self.complete {
678 return Ok(None);
679 }
680
681 let handle = match self.handle {
682 Some(h) => h,
683 None => {
684 self.complete = true;
685 return Ok(None);
686 }
687 };
688
689 let mut transport = self.transport.lock().await;
690 let result_data = transport
691 .fetch_results(handle)
692 .await
693 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
694
695 if result_data.data.is_empty() {
696 self.complete = true;
697 return Ok(None);
698 }
699
700 let batch = ResultSet::column_major_to_record_batch(&result_data, &self.metadata.schema)
701 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
702
703 Ok(Some(batch))
704 }
705
706 pub fn next_batch(&mut self) -> Option<Result<RecordBatch, QueryError>> {
710 if self.current_index < self.batches.len() {
712 let batch = self.batches[self.current_index].clone();
713 self.current_index += 1;
714 return Some(Ok(batch));
715 }
716
717 if self.complete {
719 return None;
720 }
721
722 let runtime = tokio::runtime::Handle::try_current();
724 if let Ok(handle) = runtime {
725 let result = handle.block_on(self.fetch_next_batch());
726 match result {
727 Ok(Some(batch)) => {
728 self.batches.push(batch.clone());
729 self.current_index += 1;
730 Some(Ok(batch))
731 }
732 Ok(None) => None,
733 Err(e) => Some(Err(e)),
734 }
735 } else {
736 Some(Err(QueryError::InvalidState(
738 "No async runtime available".to_string(),
739 )))
740 }
741 }
742
743 pub async fn close(mut self) -> Result<(), QueryError> {
745 if let Some(handle) = self.handle.take() {
746 let mut transport = self.transport.lock().await;
747 transport
748 .close_result_set(handle)
749 .await
750 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
751 }
752 Ok(())
753 }
754}
755
756impl Iterator for ResultSetIterator {
758 type Item = Result<RecordBatch, QueryError>;
759
760 fn next(&mut self) -> Option<Self::Item> {
761 self.next_batch()
762 }
763}
764
765#[cfg(test)]
766mod tests {
767 use super::*;
768 use crate::transport::messages::{ColumnInfo, DataType};
769 use crate::transport::protocol::{
770 PreparedStatementHandle, QueryResult as TransportQueryResult,
771 };
772 use arrow::array::Array;
773 use async_trait::async_trait;
774 use mockall::mock;
775
776 mock! {
778 pub Transport {}
779
780 #[async_trait]
781 impl TransportProtocol for Transport {
782 async fn connect(&mut self, params: &crate::transport::protocol::ConnectionParams) -> Result<(), crate::error::TransportError>;
783 async fn authenticate(&mut self, credentials: &crate::transport::protocol::Credentials) -> Result<crate::transport::messages::SessionInfo, crate::error::TransportError>;
784 async fn execute_query(&mut self, sql: &str) -> Result<TransportQueryResult, crate::error::TransportError>;
785 async fn fetch_results(&mut self, handle: ResultSetHandle) -> Result<ResultData, crate::error::TransportError>;
786 async fn close_result_set(&mut self, handle: ResultSetHandle) -> Result<(), crate::error::TransportError>;
787 async fn create_prepared_statement(&mut self, sql: &str) -> Result<PreparedStatementHandle, crate::error::TransportError>;
788 async fn execute_prepared_statement(&mut self, handle: &PreparedStatementHandle, parameters: Option<Vec<Vec<serde_json::Value>>>) -> Result<TransportQueryResult, crate::error::TransportError>;
789 async fn close_prepared_statement(&mut self, handle: &PreparedStatementHandle) -> Result<(), crate::error::TransportError>;
790 async fn close(&mut self) -> Result<(), crate::error::TransportError>;
791 fn is_connected(&self) -> bool;
792 }
793 }
794
795 #[tokio::test]
796 async fn test_result_set_row_count() {
797 let mock_transport = MockTransport::new();
798 let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
799
800 let result = TransportQueryResult::RowCount { count: 42 };
801 let result_set = ResultSet::from_transport_result(result, transport).unwrap();
802
803 assert_eq!(result_set.row_count(), Some(42));
804 assert!(!result_set.is_stream());
805 assert!(result_set.metadata().is_none());
806 }
807
808 #[tokio::test]
809 async fn test_result_set_stream() {
810 let mock_transport = MockTransport::new();
811 let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
812
813 let data = ResultData {
817 columns: vec![
818 ColumnInfo {
819 name: "id".to_string(),
820 data_type: DataType {
821 type_name: "DECIMAL".to_string(),
822 precision: Some(18),
823 scale: Some(0),
824 size: None,
825 character_set: None,
826 with_local_time_zone: None,
827 fraction: None,
828 },
829 },
830 ColumnInfo {
831 name: "name".to_string(),
832 data_type: DataType {
833 type_name: "VARCHAR".to_string(),
834 precision: None,
835 scale: None,
836 size: Some(100),
837 character_set: Some("UTF8".to_string()),
838 with_local_time_zone: None,
839 fraction: None,
840 },
841 },
842 ],
843 data: vec![
844 vec![serde_json::json!(1), serde_json::json!("Alice")],
845 vec![serde_json::json!(2), serde_json::json!("Bob")],
846 ],
847 total_rows: 2,
848 };
849
850 let result = TransportQueryResult::ResultSet {
851 handle: Some(ResultSetHandle::new(1)),
852 data,
853 };
854
855 let result_set = ResultSet::from_transport_result(result, transport).unwrap();
856
857 assert!(result_set.row_count().is_none());
858 assert!(result_set.is_stream());
859
860 let metadata = result_set.metadata().unwrap();
861 assert_eq!(metadata.column_count, 2);
862 assert_eq!(metadata.total_rows, Some(2));
863 assert_eq!(metadata.column_names(), vec!["id", "name"]);
864 }
865
866 #[tokio::test]
867 async fn test_result_set_to_record_batch() {
868 let schema = Arc::new(Schema::new(vec![
869 Field::new("id", arrow::datatypes::DataType::Decimal128(18, 0), true),
870 Field::new("name", arrow::datatypes::DataType::Utf8, true),
871 Field::new("active", arrow::datatypes::DataType::Boolean, true),
872 ]));
873
874 let data = ResultData {
878 columns: vec![],
879 data: vec![
880 vec![
881 serde_json::json!(1),
882 serde_json::json!("Alice"),
883 serde_json::json!(true),
884 ],
885 vec![
886 serde_json::json!(2),
887 serde_json::json!("Bob"),
888 serde_json::json!(false),
889 ],
890 ],
891 total_rows: 2,
892 };
893
894 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
895
896 assert_eq!(batch.num_rows(), 2);
897 assert_eq!(batch.num_columns(), 3);
898 }
899
900 #[tokio::test]
901 async fn test_single_row_single_column() {
902 let schema = Arc::new(Schema::new(vec![Field::new(
904 "answer",
905 arrow::datatypes::DataType::Decimal128(18, 0),
906 true,
907 )]));
908
909 let data = ResultData {
911 columns: vec![],
912 data: vec![
913 vec![serde_json::json!(42)], ],
915 total_rows: 1,
916 };
917
918 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
919
920 assert_eq!(batch.num_rows(), 1, "Should have exactly 1 row");
921 assert_eq!(batch.num_columns(), 1, "Should have exactly 1 column");
922 }
923
924 #[tokio::test]
925 async fn test_single_row_two_columns() {
926 let schema = Arc::new(Schema::new(vec![
928 Field::new(
929 "answer",
930 arrow::datatypes::DataType::Decimal128(18, 0),
931 true,
932 ),
933 Field::new("greeting", arrow::datatypes::DataType::Utf8, true),
934 ]));
935
936 let data = ResultData {
938 columns: vec![],
939 data: vec![
940 vec![serde_json::json!(42), serde_json::json!("hello")], ],
942 total_rows: 1,
943 };
944
945 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
946
947 assert_eq!(batch.num_rows(), 1, "Should have exactly 1 row");
948 assert_eq!(batch.num_columns(), 2, "Should have exactly 2 columns");
949 }
950
951 #[tokio::test]
952 async fn test_ten_rows_two_columns() {
953 let schema = Arc::new(Schema::new(vec![
955 Field::new("id", arrow::datatypes::DataType::Decimal128(18, 0), true),
956 Field::new("label", arrow::datatypes::DataType::Utf8, true),
957 ]));
958
959 let data = ResultData {
961 columns: vec![],
962 data: vec![
963 vec![serde_json::json!(1), serde_json::json!("Row 1")],
964 vec![serde_json::json!(2), serde_json::json!("Row 2")],
965 vec![serde_json::json!(3), serde_json::json!("Row 3")],
966 vec![serde_json::json!(4), serde_json::json!("Row 4")],
967 vec![serde_json::json!(5), serde_json::json!("Row 5")],
968 vec![serde_json::json!(6), serde_json::json!("Row 6")],
969 vec![serde_json::json!(7), serde_json::json!("Row 7")],
970 vec![serde_json::json!(8), serde_json::json!("Row 8")],
971 vec![serde_json::json!(9), serde_json::json!("Row 9")],
972 vec![serde_json::json!(10), serde_json::json!("Row 10")],
973 ],
974 total_rows: 10,
975 };
976
977 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
978
979 assert_eq!(batch.num_rows(), 10, "Should have exactly 10 rows");
980 assert_eq!(batch.num_columns(), 2, "Should have exactly 2 columns");
981 }
982
983 #[tokio::test]
984 async fn test_schema_building() {
985 let columns = vec![
986 ColumnInfo {
987 name: "id".to_string(),
988 data_type: DataType {
989 type_name: "DECIMAL".to_string(),
990 precision: Some(18),
991 scale: Some(0),
992 size: None,
993 character_set: None,
994 with_local_time_zone: None,
995 fraction: None,
996 },
997 },
998 ColumnInfo {
999 name: "name".to_string(),
1000 data_type: DataType {
1001 type_name: "VARCHAR".to_string(),
1002 precision: None,
1003 scale: None,
1004 size: Some(100),
1005 character_set: Some("UTF8".to_string()),
1006 with_local_time_zone: None,
1007 fraction: None,
1008 },
1009 },
1010 ColumnInfo {
1011 name: "created_at".to_string(),
1012 data_type: DataType {
1013 type_name: "TIMESTAMP".to_string(),
1014 precision: None,
1015 scale: None,
1016 size: None,
1017 character_set: None,
1018 with_local_time_zone: Some(false),
1019 fraction: None,
1020 },
1021 },
1022 ];
1023
1024 let schema = ResultSet::build_schema(&columns).unwrap();
1025
1026 assert_eq!(schema.fields().len(), 3);
1027 assert_eq!(schema.field(0).name(), "id");
1028 assert_eq!(schema.field(1).name(), "name");
1029 assert_eq!(schema.field(2).name(), "created_at");
1030 }
1031
1032 #[test]
1033 fn test_query_metadata() {
1034 let schema = Arc::new(Schema::new(vec![
1035 Field::new("id", arrow::datatypes::DataType::Int64, false),
1036 Field::new("name", arrow::datatypes::DataType::Utf8, true),
1037 ]));
1038
1039 let metadata = QueryMetadata::new(Arc::clone(&schema), Some(100)).with_execution_time(250);
1040
1041 assert_eq!(metadata.column_count, 2);
1042 assert_eq!(metadata.total_rows, Some(100));
1043 assert_eq!(metadata.execution_time_ms, Some(250));
1044 assert_eq!(metadata.column_names(), vec!["id", "name"]);
1045 }
1046
1047 #[test]
1052 fn test_query_metadata_column_types_returns_correct_types() {
1053 let schema = Arc::new(Schema::new(vec![
1054 Field::new("id", arrow::datatypes::DataType::Int64, false),
1055 Field::new("name", arrow::datatypes::DataType::Utf8, true),
1056 Field::new("active", arrow::datatypes::DataType::Boolean, true),
1057 ]));
1058
1059 let metadata = QueryMetadata::new(Arc::clone(&schema), Some(100));
1060 let types = metadata.column_types();
1061
1062 assert_eq!(types.len(), 3);
1063 assert_eq!(types[0], &arrow::datatypes::DataType::Int64);
1064 assert_eq!(types[1], &arrow::datatypes::DataType::Utf8);
1065 assert_eq!(types[2], &arrow::datatypes::DataType::Boolean);
1066 }
1067
1068 #[test]
1069 fn test_query_metadata_column_types_empty_schema() {
1070 let fields: Vec<Field> = vec![];
1071 let schema = Arc::new(Schema::new(fields));
1072
1073 let metadata = QueryMetadata::new(Arc::clone(&schema), None);
1074 let types = metadata.column_types();
1075
1076 assert!(types.is_empty());
1077 }
1078
1079 #[test]
1084 fn test_parse_date_to_days_unix_epoch() {
1085 let result = ResultSet::parse_date_to_days("1970-01-01").unwrap();
1086 assert_eq!(result, 0);
1087 }
1088
1089 #[test]
1090 fn test_parse_date_to_days_after_epoch() {
1091 let result = ResultSet::parse_date_to_days("1970-01-02").unwrap();
1092 assert_eq!(result, 1);
1093 }
1094
1095 #[test]
1096 fn test_parse_date_to_days_year_2000() {
1097 let result = ResultSet::parse_date_to_days("2000-01-01").unwrap();
1099 assert_eq!(result, 10957);
1100 }
1101
1102 #[test]
1103 fn test_parse_date_to_days_leap_year() {
1104 let result = ResultSet::parse_date_to_days("2000-03-01").unwrap();
1106 assert_eq!(result, 11017);
1109 }
1110
1111 #[test]
1112 fn test_parse_date_to_days_non_leap_year() {
1113 let result = ResultSet::parse_date_to_days("2001-03-01").unwrap();
1115 assert_eq!(result, 11382);
1118 }
1119
1120 #[test]
1121 fn test_parse_date_to_days_before_epoch() {
1122 let result = ResultSet::parse_date_to_days("1969-12-31").unwrap();
1124 assert_eq!(result, -1);
1125 }
1126
1127 #[test]
1128 fn test_parse_date_to_days_invalid_format_wrong_separator() {
1129 let result = ResultSet::parse_date_to_days("2000/01/01");
1130 assert!(result.is_err());
1131 }
1132
1133 #[test]
1134 fn test_parse_date_to_days_invalid_format_missing_parts() {
1135 let result = ResultSet::parse_date_to_days("2000-01");
1136 assert!(result.is_err());
1137 }
1138
1139 #[test]
1140 fn test_parse_date_to_days_invalid_month_zero() {
1141 let result = ResultSet::parse_date_to_days("2000-00-01");
1142 assert!(result.is_err());
1143 }
1144
1145 #[test]
1146 fn test_parse_date_to_days_invalid_month_thirteen() {
1147 let result = ResultSet::parse_date_to_days("2000-13-01");
1148 assert!(result.is_err());
1149 }
1150
1151 #[test]
1152 fn test_parse_date_to_days_invalid_day_zero() {
1153 let result = ResultSet::parse_date_to_days("2000-01-00");
1154 assert!(result.is_err());
1155 }
1156
1157 #[test]
1158 fn test_parse_date_to_days_invalid_day_thirty_two() {
1159 let result = ResultSet::parse_date_to_days("2000-01-32");
1160 assert!(result.is_err());
1161 }
1162
1163 #[test]
1164 fn test_parse_date_to_days_invalid_non_numeric() {
1165 let result = ResultSet::parse_date_to_days("YYYY-MM-DD");
1166 assert!(result.is_err());
1167 }
1168
1169 #[test]
1174 fn test_parse_timestamp_to_micros_date_only() {
1175 let result = ResultSet::parse_timestamp_to_micros("1970-01-01").unwrap();
1176 assert_eq!(result, 0);
1177 }
1178
1179 #[test]
1180 fn test_parse_timestamp_to_micros_with_time() {
1181 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 01:00:00").unwrap();
1183 assert_eq!(result, 3600 * 1_000_000);
1184 }
1185
1186 #[test]
1187 fn test_parse_timestamp_to_micros_with_minutes() {
1188 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:30:00").unwrap();
1190 assert_eq!(result, 30 * 60 * 1_000_000);
1191 }
1192
1193 #[test]
1194 fn test_parse_timestamp_to_micros_with_seconds() {
1195 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:45").unwrap();
1197 assert_eq!(result, 45 * 1_000_000);
1198 }
1199
1200 #[test]
1201 fn test_parse_timestamp_to_micros_with_fractional_seconds_3_digits() {
1202 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.123").unwrap();
1204 assert_eq!(result, 123000);
1205 }
1206
1207 #[test]
1208 fn test_parse_timestamp_to_micros_with_fractional_seconds_6_digits() {
1209 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.123456").unwrap();
1211 assert_eq!(result, 123456);
1212 }
1213
1214 #[test]
1215 fn test_parse_timestamp_to_micros_with_fractional_seconds_more_than_6_digits() {
1216 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.1234567").unwrap();
1218 assert_eq!(result, 123456);
1219 }
1220
1221 #[test]
1222 fn test_parse_timestamp_to_micros_with_fractional_seconds_1_digit() {
1223 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.1").unwrap();
1225 assert_eq!(result, 100000);
1226 }
1227
1228 #[test]
1229 fn test_parse_timestamp_to_micros_complex_timestamp() {
1230 let result = ResultSet::parse_timestamp_to_micros("2000-06-15 12:30:45.500").unwrap();
1235
1236 let days_micros: i64 =
1238 ResultSet::parse_date_to_days("2000-06-15").unwrap() as i64 * 86400 * 1_000_000;
1239 let time_micros: i64 = (12 * 3600 + 30 * 60 + 45) * 1_000_000 + 500000;
1240 assert_eq!(result, days_micros + time_micros);
1241 }
1242
1243 #[test]
1244 fn test_parse_timestamp_to_micros_empty_string() {
1245 let result = ResultSet::parse_timestamp_to_micros("");
1246 assert!(result.is_err());
1247 }
1248
1249 #[test]
1250 fn test_parse_timestamp_to_micros_hours_and_minutes_only() {
1251 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 01:30").unwrap();
1254 assert_eq!(result, 5400 * 1_000_000);
1255 }
1256
1257 #[test]
1262 fn test_parse_string_to_decimal_integer() {
1263 let result = ResultSet::parse_string_to_decimal("123", 2).unwrap();
1264 assert_eq!(result, 12300);
1266 }
1267
1268 #[test]
1269 fn test_parse_string_to_decimal_with_decimal_point() {
1270 let result = ResultSet::parse_string_to_decimal("123.45", 2).unwrap();
1271 assert_eq!(result, 12345);
1273 }
1274
1275 #[test]
1276 fn test_parse_string_to_decimal_negative() {
1277 let result = ResultSet::parse_string_to_decimal("-123.45", 2).unwrap();
1278 assert_eq!(result, -12345);
1280 }
1281
1282 #[test]
1283 fn test_parse_string_to_decimal_zero_scale() {
1284 let result = ResultSet::parse_string_to_decimal("123", 0).unwrap();
1285 assert_eq!(result, 123);
1286 }
1287
1288 #[test]
1289 fn test_parse_string_to_decimal_high_scale() {
1290 let result = ResultSet::parse_string_to_decimal("1.5", 6).unwrap();
1291 assert_eq!(result, 1500000);
1293 }
1294
1295 #[test]
1296 fn test_parse_string_to_decimal_truncates_extra_decimals() {
1297 let result = ResultSet::parse_string_to_decimal("1.123456", 3).unwrap();
1299 assert_eq!(result, 1123);
1301 }
1302
1303 #[test]
1304 fn test_parse_string_to_decimal_zero() {
1305 let result = ResultSet::parse_string_to_decimal("0", 2).unwrap();
1306 assert_eq!(result, 0);
1307 }
1308
1309 #[test]
1310 fn test_parse_string_to_decimal_negative_zero() {
1311 let result = ResultSet::parse_string_to_decimal("-0", 2).unwrap();
1312 assert_eq!(result, 0);
1313 }
1314
1315 #[test]
1316 fn test_parse_string_to_decimal_empty_string() {
1317 let result = ResultSet::parse_string_to_decimal("", 2);
1318 assert!(result.is_err());
1319 }
1320
1321 #[test]
1322 fn test_parse_string_to_decimal_multiple_decimal_points() {
1323 let result = ResultSet::parse_string_to_decimal("1.2.3", 2);
1324 assert!(result.is_err());
1325 }
1326
1327 #[test]
1328 fn test_parse_string_to_decimal_non_numeric() {
1329 let result = ResultSet::parse_string_to_decimal("abc", 2);
1330 assert!(result.is_err());
1331 }
1332
1333 #[test]
1334 fn test_parse_string_to_decimal_large_value() {
1335 let result = ResultSet::parse_string_to_decimal("999999999999999999", 0).unwrap();
1336 assert_eq!(result, 999999999999999999_i128);
1337 }
1338
1339 #[test]
1344 fn test_exasol_datatype_to_arrow_boolean() {
1345 let data_type = DataType {
1346 type_name: "BOOLEAN".to_string(),
1347 precision: None,
1348 scale: None,
1349 size: None,
1350 character_set: None,
1351 with_local_time_zone: None,
1352 fraction: None,
1353 };
1354
1355 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1356 assert_eq!(result, arrow::datatypes::DataType::Boolean);
1357 }
1358
1359 #[test]
1360 fn test_exasol_datatype_to_arrow_char() {
1361 let data_type = DataType {
1362 type_name: "CHAR".to_string(),
1363 precision: None,
1364 scale: None,
1365 size: Some(10),
1366 character_set: None,
1367 with_local_time_zone: None,
1368 fraction: None,
1369 };
1370
1371 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1372 assert_eq!(result, arrow::datatypes::DataType::Utf8);
1373 }
1374
1375 #[test]
1376 fn test_exasol_datatype_to_arrow_char_default_size() {
1377 let data_type = DataType {
1378 type_name: "CHAR".to_string(),
1379 precision: None,
1380 scale: None,
1381 size: None, character_set: None,
1383 with_local_time_zone: None,
1384 fraction: None,
1385 };
1386
1387 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1388 assert_eq!(result, arrow::datatypes::DataType::Utf8);
1389 }
1390
1391 #[test]
1392 fn test_exasol_datatype_to_arrow_varchar() {
1393 let data_type = DataType {
1394 type_name: "VARCHAR".to_string(),
1395 precision: None,
1396 scale: None,
1397 size: Some(100),
1398 character_set: None,
1399 with_local_time_zone: None,
1400 fraction: None,
1401 };
1402
1403 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1404 assert_eq!(result, arrow::datatypes::DataType::Utf8);
1405 }
1406
1407 #[test]
1408 fn test_exasol_datatype_to_arrow_varchar_default_size() {
1409 let data_type = DataType {
1410 type_name: "VARCHAR".to_string(),
1411 precision: None,
1412 scale: None,
1413 size: None, character_set: None,
1415 with_local_time_zone: None,
1416 fraction: None,
1417 };
1418
1419 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1420 assert_eq!(result, arrow::datatypes::DataType::Utf8);
1421 }
1422
1423 #[test]
1424 fn test_exasol_datatype_to_arrow_decimal() {
1425 let data_type = DataType {
1426 type_name: "DECIMAL".to_string(),
1427 precision: Some(18),
1428 scale: Some(2),
1429 size: None,
1430 character_set: None,
1431 with_local_time_zone: None,
1432 fraction: None,
1433 };
1434
1435 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1436 assert_eq!(result, arrow::datatypes::DataType::Decimal128(18, 2));
1437 }
1438
1439 #[test]
1440 fn test_exasol_datatype_to_arrow_decimal_default_precision_scale() {
1441 let data_type = DataType {
1442 type_name: "DECIMAL".to_string(),
1443 precision: None, scale: None, size: None,
1446 character_set: None,
1447 with_local_time_zone: None,
1448 fraction: None,
1449 };
1450
1451 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1452 assert_eq!(result, arrow::datatypes::DataType::Decimal128(18, 0));
1453 }
1454
1455 #[test]
1456 fn test_exasol_datatype_to_arrow_double() {
1457 let data_type = DataType {
1458 type_name: "DOUBLE".to_string(),
1459 precision: None,
1460 scale: None,
1461 size: None,
1462 character_set: None,
1463 with_local_time_zone: None,
1464 fraction: None,
1465 };
1466
1467 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1468 assert_eq!(result, arrow::datatypes::DataType::Float64);
1469 }
1470
1471 #[test]
1472 fn test_exasol_datatype_to_arrow_date() {
1473 let data_type = DataType {
1474 type_name: "DATE".to_string(),
1475 precision: None,
1476 scale: None,
1477 size: None,
1478 character_set: None,
1479 with_local_time_zone: None,
1480 fraction: None,
1481 };
1482
1483 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1484 assert_eq!(result, arrow::datatypes::DataType::Date32);
1485 }
1486
1487 #[test]
1488 fn test_exasol_datatype_to_arrow_timestamp_without_tz() {
1489 let data_type = DataType {
1490 type_name: "TIMESTAMP".to_string(),
1491 precision: None,
1492 scale: None,
1493 size: None,
1494 character_set: None,
1495 with_local_time_zone: Some(false),
1496 fraction: None,
1497 };
1498
1499 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1500 assert!(matches!(
1501 result,
1502 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None)
1503 ));
1504 }
1505
1506 #[test]
1507 fn test_exasol_datatype_to_arrow_timestamp_with_tz() {
1508 let data_type = DataType {
1509 type_name: "TIMESTAMP".to_string(),
1510 precision: None,
1511 scale: None,
1512 size: None,
1513 character_set: None,
1514 with_local_time_zone: Some(true),
1515 fraction: None,
1516 };
1517
1518 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1519 assert!(matches!(
1520 result,
1521 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, Some(_))
1522 ));
1523 }
1524
1525 #[test]
1526 fn test_exasol_datatype_to_arrow_timestamp_default_tz() {
1527 let data_type = DataType {
1528 type_name: "TIMESTAMP".to_string(),
1529 precision: None,
1530 scale: None,
1531 size: None,
1532 character_set: None,
1533 with_local_time_zone: None, fraction: None,
1535 };
1536
1537 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1538 assert!(matches!(
1539 result,
1540 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None)
1541 ));
1542 }
1543
1544 #[test]
1545 fn test_exasol_datatype_to_arrow_interval_year_to_month() {
1546 let data_type = DataType {
1547 type_name: "INTERVAL YEAR TO MONTH".to_string(),
1548 precision: None,
1549 scale: None,
1550 size: None,
1551 character_set: None,
1552 with_local_time_zone: None,
1553 fraction: None,
1554 };
1555
1556 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1557 assert!(matches!(
1558 result,
1559 arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1560 ));
1561 }
1562
1563 #[test]
1564 fn test_exasol_datatype_to_arrow_interval_day_to_second() {
1565 let data_type = DataType {
1566 type_name: "INTERVAL DAY TO SECOND".to_string(),
1567 precision: None,
1568 scale: None,
1569 size: None,
1570 character_set: None,
1571 with_local_time_zone: None,
1572 fraction: Some(6),
1573 };
1574
1575 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1576 assert!(matches!(
1577 result,
1578 arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1579 ));
1580 }
1581
1582 #[test]
1583 fn test_exasol_datatype_to_arrow_interval_day_to_second_default_fraction() {
1584 let data_type = DataType {
1585 type_name: "INTERVAL DAY TO SECOND".to_string(),
1586 precision: None,
1587 scale: None,
1588 size: None,
1589 character_set: None,
1590 with_local_time_zone: None,
1591 fraction: None, };
1593
1594 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1595 assert!(matches!(
1596 result,
1597 arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1598 ));
1599 }
1600
1601 #[test]
1602 fn test_exasol_datatype_to_arrow_geometry() {
1603 let data_type = DataType {
1604 type_name: "GEOMETRY".to_string(),
1605 precision: None,
1606 scale: None,
1607 size: None,
1608 character_set: None,
1609 with_local_time_zone: None,
1610 fraction: None,
1611 };
1612
1613 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1614 assert_eq!(result, arrow::datatypes::DataType::Binary);
1615 }
1616
1617 #[test]
1618 fn test_exasol_datatype_to_arrow_hashtype() {
1619 let data_type = DataType {
1620 type_name: "HASHTYPE".to_string(),
1621 precision: None,
1622 scale: None,
1623 size: None,
1624 character_set: None,
1625 with_local_time_zone: None,
1626 fraction: None,
1627 };
1628
1629 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1630 assert_eq!(result, arrow::datatypes::DataType::Binary);
1631 }
1632
1633 #[test]
1634 fn test_exasol_datatype_to_arrow_unsupported_type() {
1635 let data_type = DataType {
1636 type_name: "UNKNOWN_TYPE".to_string(),
1637 precision: None,
1638 scale: None,
1639 size: None,
1640 character_set: None,
1641 with_local_time_zone: None,
1642 fraction: None,
1643 };
1644
1645 let result = ResultSet::exasol_datatype_to_arrow(&data_type);
1646 assert!(result.is_err());
1647 }
1648
1649 #[tokio::test]
1654 async fn test_column_major_to_record_batch_with_int32() {
1655 let schema = Arc::new(Schema::new(vec![Field::new(
1656 "value",
1657 arrow::datatypes::DataType::Int32,
1658 true,
1659 )]));
1660
1661 let data = ResultData {
1662 columns: vec![],
1663 data: vec![
1664 vec![serde_json::json!(1)],
1665 vec![serde_json::json!(2)],
1666 vec![serde_json::json!(null)],
1667 vec![serde_json::json!(4)],
1668 ],
1669 total_rows: 4,
1670 };
1671
1672 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1673
1674 assert_eq!(batch.num_rows(), 4);
1675 assert_eq!(batch.num_columns(), 1);
1676
1677 let array = batch
1678 .column(0)
1679 .as_any()
1680 .downcast_ref::<arrow::array::Int32Array>()
1681 .unwrap();
1682 assert_eq!(array.value(0), 1);
1683 assert_eq!(array.value(1), 2);
1684 assert!(array.is_null(2));
1685 assert_eq!(array.value(3), 4);
1686 }
1687
1688 #[tokio::test]
1689 async fn test_column_major_to_record_batch_with_int64() {
1690 let schema = Arc::new(Schema::new(vec![Field::new(
1691 "value",
1692 arrow::datatypes::DataType::Int64,
1693 true,
1694 )]));
1695
1696 let data = ResultData {
1697 columns: vec![],
1698 data: vec![
1699 vec![serde_json::json!(9223372036854775807_i64)], vec![serde_json::json!(-9223372036854775808_i64)], vec![serde_json::json!(null)],
1702 ],
1703 total_rows: 3,
1704 };
1705
1706 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1707
1708 assert_eq!(batch.num_rows(), 3);
1709
1710 let array = batch
1711 .column(0)
1712 .as_any()
1713 .downcast_ref::<arrow::array::Int64Array>()
1714 .unwrap();
1715 assert_eq!(array.value(0), 9223372036854775807_i64);
1716 assert_eq!(array.value(1), -9223372036854775808_i64);
1717 assert!(array.is_null(2));
1718 }
1719
1720 #[tokio::test]
1721 async fn test_column_major_to_record_batch_with_float64() {
1722 let schema = Arc::new(Schema::new(vec![Field::new(
1723 "value",
1724 arrow::datatypes::DataType::Float64,
1725 true,
1726 )]));
1727
1728 let data = ResultData {
1729 columns: vec![],
1730 data: vec![
1731 vec![serde_json::json!(1.23456)],
1732 vec![serde_json::json!(-9.87654)],
1733 vec![serde_json::json!(null)],
1734 vec![serde_json::json!(0.0)],
1735 ],
1736 total_rows: 4,
1737 };
1738
1739 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1740
1741 assert_eq!(batch.num_rows(), 4);
1742
1743 let array = batch
1744 .column(0)
1745 .as_any()
1746 .downcast_ref::<arrow::array::Float64Array>()
1747 .unwrap();
1748 assert!((array.value(0) - 1.23456).abs() < 0.00001);
1749 assert!((array.value(1) - (-9.87654)).abs() < 0.00001);
1750 assert!(array.is_null(2));
1751 assert!((array.value(3) - 0.0).abs() < 0.00001);
1752 }
1753
1754 #[tokio::test]
1755 async fn test_column_major_to_record_batch_with_boolean() {
1756 let schema = Arc::new(Schema::new(vec![Field::new(
1757 "flag",
1758 arrow::datatypes::DataType::Boolean,
1759 true,
1760 )]));
1761
1762 let data = ResultData {
1763 columns: vec![],
1764 data: vec![
1765 vec![serde_json::json!(true)],
1766 vec![serde_json::json!(false)],
1767 vec![serde_json::json!(null)],
1768 ],
1769 total_rows: 3,
1770 };
1771
1772 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1773
1774 assert_eq!(batch.num_rows(), 3);
1775
1776 let array = batch
1777 .column(0)
1778 .as_any()
1779 .downcast_ref::<arrow::array::BooleanArray>()
1780 .unwrap();
1781 assert!(array.value(0));
1782 assert!(!array.value(1));
1783 assert!(array.is_null(2));
1784 }
1785
1786 #[tokio::test]
1787 async fn test_column_major_to_record_batch_with_date32() {
1788 let schema = Arc::new(Schema::new(vec![Field::new(
1789 "date",
1790 arrow::datatypes::DataType::Date32,
1791 true,
1792 )]));
1793
1794 let data = ResultData {
1795 columns: vec![],
1796 data: vec![
1797 vec![serde_json::json!("1970-01-01")],
1798 vec![serde_json::json!("2000-01-01")],
1799 vec![serde_json::json!(null)],
1800 ],
1801 total_rows: 3,
1802 };
1803
1804 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1805
1806 assert_eq!(batch.num_rows(), 3);
1807
1808 let array = batch
1809 .column(0)
1810 .as_any()
1811 .downcast_ref::<arrow::array::Date32Array>()
1812 .unwrap();
1813 assert_eq!(array.value(0), 0); assert_eq!(array.value(1), 10957); assert!(array.is_null(2));
1816 }
1817
1818 #[tokio::test]
1819 async fn test_column_major_to_record_batch_with_timestamp() {
1820 let schema = Arc::new(Schema::new(vec![Field::new(
1821 "timestamp",
1822 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1823 true,
1824 )]));
1825
1826 let data = ResultData {
1827 columns: vec![],
1828 data: vec![
1829 vec![serde_json::json!("1970-01-01 00:00:00")],
1830 vec![serde_json::json!("1970-01-01 01:00:00.123456")],
1831 vec![serde_json::json!(null)],
1832 ],
1833 total_rows: 3,
1834 };
1835
1836 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1837
1838 assert_eq!(batch.num_rows(), 3);
1839
1840 let array = batch
1841 .column(0)
1842 .as_any()
1843 .downcast_ref::<arrow::array::TimestampMicrosecondArray>()
1844 .unwrap();
1845 assert_eq!(array.value(0), 0);
1846 assert_eq!(array.value(1), 3600 * 1_000_000 + 123456);
1848 assert!(array.is_null(2));
1849 }
1850
1851 #[tokio::test]
1852 async fn test_column_major_to_record_batch_empty_data() {
1853 let schema = Arc::new(Schema::new(vec![
1854 Field::new("id", arrow::datatypes::DataType::Int64, true),
1855 Field::new("name", arrow::datatypes::DataType::Utf8, true),
1856 ]));
1857
1858 let data = ResultData {
1859 columns: vec![],
1860 data: vec![],
1861 total_rows: 0,
1862 };
1863
1864 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1865
1866 assert_eq!(batch.num_rows(), 0);
1867 assert_eq!(batch.num_columns(), 2);
1868 }
1869
1870 #[tokio::test]
1871 async fn test_column_major_to_record_batch_with_utf8_string() {
1872 let schema = Arc::new(Schema::new(vec![Field::new(
1873 "text",
1874 arrow::datatypes::DataType::Utf8,
1875 true,
1876 )]));
1877
1878 let data = ResultData {
1879 columns: vec![],
1880 data: vec![
1881 vec![serde_json::json!("hello")],
1882 vec![serde_json::json!("world")],
1883 vec![serde_json::json!(null)],
1884 vec![serde_json::json!("")], ],
1886 total_rows: 4,
1887 };
1888
1889 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1890
1891 assert_eq!(batch.num_rows(), 4);
1892
1893 let array = batch
1894 .column(0)
1895 .as_any()
1896 .downcast_ref::<arrow::array::StringArray>()
1897 .unwrap();
1898 assert_eq!(array.value(0), "hello");
1899 assert_eq!(array.value(1), "world");
1900 assert!(array.is_null(2));
1901 assert_eq!(array.value(3), "");
1902 }
1903
1904 #[tokio::test]
1905 async fn test_column_major_to_record_batch_utf8_from_non_string_value() {
1906 let schema = Arc::new(Schema::new(vec![Field::new(
1908 "text",
1909 arrow::datatypes::DataType::Utf8,
1910 true,
1911 )]));
1912
1913 let data = ResultData {
1914 columns: vec![],
1915 data: vec![
1916 vec![serde_json::json!(123)], ],
1918 total_rows: 1,
1919 };
1920
1921 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1922
1923 let array = batch
1924 .column(0)
1925 .as_any()
1926 .downcast_ref::<arrow::array::StringArray>()
1927 .unwrap();
1928 assert_eq!(array.value(0), "123");
1929 }
1930
1931 #[tokio::test]
1932 async fn test_column_major_to_record_batch_decimal_from_string() {
1933 let schema = Arc::new(Schema::new(vec![Field::new(
1934 "amount",
1935 arrow::datatypes::DataType::Decimal128(18, 2),
1936 true,
1937 )]));
1938
1939 let data = ResultData {
1940 columns: vec![],
1941 data: vec![
1942 vec![serde_json::json!("123.45")],
1943 vec![serde_json::json!("-67.89")],
1944 vec![serde_json::json!(null)],
1945 ],
1946 total_rows: 3,
1947 };
1948
1949 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1950
1951 assert_eq!(batch.num_rows(), 3);
1952
1953 let array = batch
1954 .column(0)
1955 .as_any()
1956 .downcast_ref::<arrow::array::Decimal128Array>()
1957 .unwrap();
1958 assert_eq!(array.value(0), 12345); assert_eq!(array.value(1), -6789); assert!(array.is_null(2));
1961 }
1962
1963 #[tokio::test]
1964 async fn test_column_major_to_record_batch_decimal_from_integer() {
1965 let schema = Arc::new(Schema::new(vec![Field::new(
1966 "amount",
1967 arrow::datatypes::DataType::Decimal128(18, 2),
1968 true,
1969 )]));
1970
1971 let data = ResultData {
1972 columns: vec![],
1973 data: vec![
1974 vec![serde_json::json!(100)], ],
1976 total_rows: 1,
1977 };
1978
1979 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1980
1981 let array = batch
1982 .column(0)
1983 .as_any()
1984 .downcast_ref::<arrow::array::Decimal128Array>()
1985 .unwrap();
1986 assert_eq!(array.value(0), 10000); }
1988
1989 #[tokio::test]
1990 async fn test_column_major_to_record_batch_decimal_from_float() {
1991 let schema = Arc::new(Schema::new(vec![Field::new(
1992 "amount",
1993 arrow::datatypes::DataType::Decimal128(18, 2),
1994 true,
1995 )]));
1996
1997 let data = ResultData {
1998 columns: vec![],
1999 data: vec![
2000 vec![serde_json::json!(99.99)], ],
2002 total_rows: 1,
2003 };
2004
2005 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2006
2007 let array = batch
2008 .column(0)
2009 .as_any()
2010 .downcast_ref::<arrow::array::Decimal128Array>()
2011 .unwrap();
2012 assert_eq!(array.value(0), 9999); }
2014
2015 #[tokio::test]
2016 async fn test_column_major_to_record_batch_invalid_date_becomes_null() {
2017 let schema = Arc::new(Schema::new(vec![Field::new(
2018 "date",
2019 arrow::datatypes::DataType::Date32,
2020 true,
2021 )]));
2022
2023 let data = ResultData {
2024 columns: vec![],
2025 data: vec![
2026 vec![serde_json::json!("invalid-date")],
2027 vec![serde_json::json!(123)], ],
2029 total_rows: 2,
2030 };
2031
2032 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2033
2034 let array = batch
2035 .column(0)
2036 .as_any()
2037 .downcast_ref::<arrow::array::Date32Array>()
2038 .unwrap();
2039 assert!(array.is_null(0));
2041 assert!(array.is_null(1));
2042 }
2043
2044 #[tokio::test]
2045 async fn test_column_major_to_record_batch_invalid_timestamp_becomes_null() {
2046 let schema = Arc::new(Schema::new(vec![Field::new(
2047 "timestamp",
2048 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
2049 true,
2050 )]));
2051
2052 let data = ResultData {
2053 columns: vec![],
2054 data: vec![
2055 vec![serde_json::json!("not-a-timestamp")],
2056 vec![serde_json::json!(12345)], ],
2058 total_rows: 2,
2059 };
2060
2061 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2062
2063 let array = batch
2064 .column(0)
2065 .as_any()
2066 .downcast_ref::<arrow::array::TimestampMicrosecondArray>()
2067 .unwrap();
2068 assert!(array.is_null(0));
2069 assert!(array.is_null(1));
2070 }
2071
2072 #[tokio::test]
2073 async fn test_column_major_to_record_batch_boolean_null_from_non_bool() {
2074 let schema = Arc::new(Schema::new(vec![Field::new(
2075 "flag",
2076 arrow::datatypes::DataType::Boolean,
2077 true,
2078 )]));
2079
2080 let data = ResultData {
2081 columns: vec![],
2082 data: vec![
2083 vec![serde_json::json!("not-a-bool")], vec![serde_json::json!(123)], ],
2086 total_rows: 2,
2087 };
2088
2089 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2090
2091 let array = batch
2092 .column(0)
2093 .as_any()
2094 .downcast_ref::<arrow::array::BooleanArray>()
2095 .unwrap();
2096 assert!(array.is_null(0));
2098 assert!(array.is_null(1));
2099 }
2100
2101 #[tokio::test]
2102 async fn test_column_major_to_record_batch_int32_null_from_non_int() {
2103 let schema = Arc::new(Schema::new(vec![Field::new(
2104 "value",
2105 arrow::datatypes::DataType::Int32,
2106 true,
2107 )]));
2108
2109 let data = ResultData {
2110 columns: vec![],
2111 data: vec![
2112 vec![serde_json::json!("not-an-int")], ],
2114 total_rows: 1,
2115 };
2116
2117 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2118
2119 let array = batch
2120 .column(0)
2121 .as_any()
2122 .downcast_ref::<arrow::array::Int32Array>()
2123 .unwrap();
2124 assert!(array.is_null(0));
2125 }
2126
2127 #[tokio::test]
2128 async fn test_column_major_to_record_batch_float64_null_from_non_float() {
2129 let schema = Arc::new(Schema::new(vec![Field::new(
2130 "value",
2131 arrow::datatypes::DataType::Float64,
2132 true,
2133 )]));
2134
2135 let data = ResultData {
2136 columns: vec![],
2137 data: vec![
2138 vec![serde_json::json!("not-a-float")], ],
2140 total_rows: 1,
2141 };
2142
2143 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2144
2145 let array = batch
2146 .column(0)
2147 .as_any()
2148 .downcast_ref::<arrow::array::Float64Array>()
2149 .unwrap();
2150 assert!(array.is_null(0));
2151 }
2152
2153 #[tokio::test]
2154 async fn test_column_major_to_record_batch_unsupported_type_returns_error() {
2155 let schema = Arc::new(Schema::new(vec![Field::new(
2158 "large_text",
2159 arrow::datatypes::DataType::LargeUtf8,
2160 true,
2161 )]));
2162
2163 let data = ResultData {
2164 columns: vec![],
2165 data: vec![vec![serde_json::json!("test_data")]],
2166 total_rows: 1,
2167 };
2168
2169 let result = ResultSet::column_major_to_record_batch(&data, &schema);
2170
2171 assert!(result.is_err());
2174 }
2175
2176 #[tokio::test]
2181 async fn test_result_set_iterator_metadata() {
2182 let mock_transport = MockTransport::new();
2183 let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
2184
2185 let data = ResultData {
2186 columns: vec![ColumnInfo {
2187 name: "id".to_string(),
2188 data_type: DataType {
2189 type_name: "DECIMAL".to_string(),
2190 precision: Some(18),
2191 scale: Some(0),
2192 size: None,
2193 character_set: None,
2194 with_local_time_zone: None,
2195 fraction: None,
2196 },
2197 }],
2198 data: vec![vec![serde_json::json!(1)]],
2199 total_rows: 1,
2200 };
2201
2202 let result = TransportQueryResult::ResultSet {
2203 handle: Some(ResultSetHandle::new(1)),
2204 data,
2205 };
2206
2207 let result_set = ResultSet::from_transport_result(result, transport).unwrap();
2208 let iterator = result_set.into_iterator().unwrap();
2209
2210 let metadata = iterator.metadata();
2211 assert_eq!(metadata.column_count, 1);
2212 assert_eq!(metadata.total_rows, Some(1));
2213 assert_eq!(metadata.column_names(), vec!["id"]);
2214 }
2215
2216 #[tokio::test]
2217 async fn test_result_set_iterator_metadata_multiple_columns() {
2218 let mock_transport = MockTransport::new();
2219 let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
2220
2221 let data = ResultData {
2222 columns: vec![
2223 ColumnInfo {
2224 name: "id".to_string(),
2225 data_type: DataType {
2226 type_name: "DECIMAL".to_string(),
2227 precision: Some(18),
2228 scale: Some(0),
2229 size: None,
2230 character_set: None,
2231 with_local_time_zone: None,
2232 fraction: None,
2233 },
2234 },
2235 ColumnInfo {
2236 name: "name".to_string(),
2237 data_type: DataType {
2238 type_name: "VARCHAR".to_string(),
2239 precision: None,
2240 scale: None,
2241 size: Some(100),
2242 character_set: Some("UTF8".to_string()),
2243 with_local_time_zone: None,
2244 fraction: None,
2245 },
2246 },
2247 ColumnInfo {
2248 name: "active".to_string(),
2249 data_type: DataType {
2250 type_name: "BOOLEAN".to_string(),
2251 precision: None,
2252 scale: None,
2253 size: None,
2254 character_set: None,
2255 with_local_time_zone: None,
2256 fraction: None,
2257 },
2258 },
2259 ],
2260 data: vec![vec![
2261 serde_json::json!(1),
2262 serde_json::json!("Alice"),
2263 serde_json::json!(true),
2264 ]],
2265 total_rows: 1,
2266 };
2267
2268 let result = TransportQueryResult::ResultSet {
2269 handle: Some(ResultSetHandle::new(1)),
2270 data,
2271 };
2272
2273 let result_set = ResultSet::from_transport_result(result, transport).unwrap();
2274 let iterator = result_set.into_iterator().unwrap();
2275
2276 let metadata = iterator.metadata();
2277 assert_eq!(metadata.column_count, 3);
2278 assert_eq!(metadata.column_names(), vec!["id", "name", "active"]);
2279
2280 let types = metadata.column_types();
2281 assert_eq!(types.len(), 3);
2282 assert!(matches!(
2283 types[0],
2284 arrow::datatypes::DataType::Decimal128(18, 0)
2285 ));
2286 assert!(matches!(types[1], arrow::datatypes::DataType::Utf8));
2287 assert!(matches!(types[2], arrow::datatypes::DataType::Boolean));
2288 }
2289}