1use crate::error::{ConversionError, QueryError};
7use crate::transport::messages::{ColumnInfo, ResultData, ResultSetHandle};
8use crate::transport::protocol::QueryResult as TransportQueryResult;
9use crate::transport::TransportProtocol;
10use crate::types::TypeMapper;
11use arrow::array::RecordBatch;
12use arrow::datatypes::{Field, Schema};
13use std::sync::Arc;
14use tokio::sync::Mutex;
15
16#[derive(Debug, Clone)]
18pub struct QueryMetadata {
19 pub schema: Arc<Schema>,
21 pub total_rows: Option<i64>,
23 pub column_count: usize,
25 pub execution_time_ms: Option<u64>,
27}
28
29impl QueryMetadata {
30 pub fn new(schema: Arc<Schema>, total_rows: Option<i64>) -> Self {
32 Self {
33 column_count: schema.fields().len(),
34 schema,
35 total_rows,
36 execution_time_ms: None,
37 }
38 }
39
40 pub fn with_execution_time(mut self, execution_time_ms: u64) -> Self {
42 self.execution_time_ms = Some(execution_time_ms);
43 self
44 }
45
46 pub fn column_names(&self) -> Vec<&str> {
48 self.schema
49 .fields()
50 .iter()
51 .map(|f| f.name().as_str())
52 .collect()
53 }
54
55 pub fn column_types(&self) -> Vec<&arrow::datatypes::DataType> {
57 self.schema.fields().iter().map(|f| f.data_type()).collect()
58 }
59}
60
61pub struct ResultSet {
63 inner: ResultSetInner,
65 transport: Arc<Mutex<dyn TransportProtocol>>,
67}
68
69impl std::fmt::Debug for ResultSet {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("ResultSet")
72 .field("inner", &self.inner)
73 .field("transport", &"<TransportProtocol>")
74 .finish()
75 }
76}
77
78#[derive(Debug)]
79enum ResultSetInner {
80 RowCount { count: i64 },
82 Stream {
84 handle: Option<ResultSetHandle>,
86 metadata: QueryMetadata,
88 batches: Vec<RecordBatch>,
90 complete: bool,
92 },
93}
94
95impl ResultSet {
96 pub(crate) fn from_transport_result(
98 result: TransportQueryResult,
99 transport: Arc<Mutex<dyn TransportProtocol>>,
100 ) -> Result<Self, QueryError> {
101 match result {
102 TransportQueryResult::RowCount { count } => Ok(Self {
103 inner: ResultSetInner::RowCount { count },
104 transport,
105 }),
106 TransportQueryResult::ResultSet { handle, data } => {
107 let schema = Self::build_schema(&data.columns)
108 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
109
110 let metadata = QueryMetadata::new(Arc::clone(&schema), Some(data.total_rows));
111
112 let batches = if !data.data.is_empty() {
115 vec![Self::column_major_to_record_batch(&data, &schema)
116 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?]
117 } else {
118 Vec::new()
119 };
120
121 let num_rows_received = if data.data.is_empty() {
124 0
125 } else {
126 data.data[0].len() as i64
127 };
128 let complete = handle.is_none() || data.total_rows == num_rows_received;
129
130 Ok(Self {
131 inner: ResultSetInner::Stream {
132 handle, metadata,
134 batches,
135 complete,
136 },
137 transport,
138 })
139 }
140 }
141 }
142
143 pub fn row_count(&self) -> Option<i64> {
145 match &self.inner {
146 ResultSetInner::RowCount { count } => Some(*count),
147 _ => None,
148 }
149 }
150
151 pub fn metadata(&self) -> Option<&QueryMetadata> {
153 match &self.inner {
154 ResultSetInner::Stream { metadata, .. } => Some(metadata),
155 _ => None,
156 }
157 }
158
159 pub fn is_stream(&self) -> bool {
161 matches!(&self.inner, ResultSetInner::Stream { .. })
162 }
163
164 pub fn into_iterator(self) -> Result<ResultSetIterator, QueryError> {
169 match self.inner {
170 ResultSetInner::Stream {
171 handle,
172 metadata,
173 batches,
174 complete,
175 } => Ok(ResultSetIterator {
176 handle,
177 transport: self.transport,
178 metadata,
179 batches,
180 current_index: 0,
181 complete,
182 }),
183 ResultSetInner::RowCount { .. } => Err(QueryError::NoResultSet(
184 "Cannot iterate over row count result".to_string(),
185 )),
186 }
187 }
188
189 pub async fn fetch_all(mut self) -> Result<Vec<RecordBatch>, QueryError> {
194 match &mut self.inner {
195 ResultSetInner::Stream {
196 handle,
197 metadata,
198 batches,
199 complete,
200 } => {
201 let all_batches = if *complete {
202 batches.clone()
203 } else {
204 let mut all_batches = batches.clone();
205
206 if let Some(handle_val) = handle {
208 loop {
209 let mut transport = self.transport.lock().await;
210 let result_data = transport
211 .fetch_results(*handle_val)
212 .await
213 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
214
215 if result_data.data.is_empty() {
216 *complete = true;
217 break;
218 }
219
220 let batch =
221 Self::column_major_to_record_batch(&result_data, &metadata.schema)
222 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
223
224 all_batches.push(batch);
225
226 if result_data.total_rows > 0
228 && all_batches.iter().map(|b| b.num_rows()).sum::<usize>()
229 >= result_data.total_rows as usize
230 {
231 *complete = true;
232 break;
233 }
234 }
235 }
236
237 *batches = all_batches.clone();
238 all_batches
239 };
240
241 if let Some(handle_val) = handle.take() {
243 let mut transport = self.transport.lock().await;
244 let _ = transport.close_result_set(handle_val).await;
246 }
247
248 Ok(all_batches)
249 }
250 ResultSetInner::RowCount { .. } => Err(QueryError::NoResultSet(
251 "Cannot fetch batches from row count result".to_string(),
252 )),
253 }
254 }
255
256 fn build_schema(columns: &[ColumnInfo]) -> Result<Arc<Schema>, ConversionError> {
258 let fields: Result<Vec<Field>, ConversionError> = columns
259 .iter()
260 .map(|col| {
261 let arrow_type = Self::exasol_datatype_to_arrow(&col.data_type)?;
263
264 Ok(Field::new(&col.name, arrow_type, true))
266 })
267 .collect();
268
269 Ok(Arc::new(Schema::new(fields?)))
270 }
271
272 fn exasol_datatype_to_arrow(
274 data_type: &crate::transport::messages::DataType,
275 ) -> Result<arrow::datatypes::DataType, ConversionError> {
276 use crate::types::ExasolType;
277
278 let exasol_type = match data_type.type_name.as_str() {
279 "BOOLEAN" => ExasolType::Boolean,
280 "CHAR" => ExasolType::Char {
281 size: data_type.size.unwrap_or(1) as usize,
282 },
283 "VARCHAR" => ExasolType::Varchar {
284 size: data_type.size.unwrap_or(2000000) as usize,
285 },
286 "DECIMAL" => ExasolType::Decimal {
287 precision: data_type.precision.unwrap_or(18) as u8,
288 scale: data_type.scale.unwrap_or(0) as i8,
289 },
290 "DOUBLE" => ExasolType::Double,
291 "DATE" => ExasolType::Date,
292 "TIMESTAMP" => ExasolType::Timestamp {
293 with_local_time_zone: data_type.with_local_time_zone.unwrap_or(false),
294 },
295 "TIMESTAMP WITH LOCAL TIME ZONE" => ExasolType::Timestamp {
296 with_local_time_zone: true,
297 },
298 "INTERVAL YEAR TO MONTH" => ExasolType::IntervalYearToMonth,
299 "INTERVAL DAY TO SECOND" => ExasolType::IntervalDayToSecond {
300 precision: data_type.fraction.unwrap_or(3) as u8,
301 },
302 "GEOMETRY" => ExasolType::Geometry { srid: None },
303 "HASHTYPE" => ExasolType::Hashtype { byte_size: 16 },
304 _ => {
305 return Err(ConversionError::UnsupportedType {
306 exasol_type: data_type.type_name.clone(),
307 })
308 }
309 };
310
311 TypeMapper::exasol_to_arrow(&exasol_type, true)
312 }
313
314 fn column_major_to_record_batch(
321 data: &ResultData,
322 schema: &Arc<Schema>,
323 ) -> Result<RecordBatch, ConversionError> {
324 use arrow::array::*;
325 use serde_json::Value;
326
327 if data.data.is_empty() {
328 let empty_arrays: Vec<Arc<dyn Array>> = schema
330 .fields()
331 .iter()
332 .map(|field| new_empty_array(field.data_type()))
333 .collect();
334
335 return RecordBatch::try_new(Arc::clone(schema), empty_arrays)
336 .map_err(|e| ConversionError::ArrowError(e.to_string()));
337 }
338
339 let num_columns = schema.fields().len();
341 let column_values: Vec<Vec<&Value>> = (0..num_columns)
342 .map(|col_idx| {
343 data.data
344 .iter()
345 .map(|row| row.get(col_idx).unwrap_or(&Value::Null))
346 .collect()
347 })
348 .collect();
349
350 let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
351
352 for (col_idx, field) in schema.fields().iter().enumerate() {
353 use arrow::datatypes::DataType;
354
355 let col_values = &column_values[col_idx];
357
358 let array: Arc<dyn Array> = match field.data_type() {
360 DataType::Boolean => {
361 let mut builder = BooleanBuilder::new();
362 for value in col_values {
363 if value.is_null() {
364 builder.append_null();
365 } else if let Some(b) = value.as_bool() {
366 builder.append_value(b);
367 } else {
368 builder.append_null();
369 }
370 }
371 Arc::new(builder.finish())
372 }
373 DataType::Int32 => {
374 let mut builder = Int32Builder::new();
375 for value in col_values {
376 if value.is_null() {
377 builder.append_null();
378 } else if let Some(i) = value.as_i64() {
379 builder.append_value(i as i32);
380 } else {
381 builder.append_null();
382 }
383 }
384 Arc::new(builder.finish())
385 }
386 DataType::Int64 => {
387 let mut builder = Int64Builder::new();
388 for value in col_values {
389 if value.is_null() {
390 builder.append_null();
391 } else if let Some(i) = value.as_i64() {
392 builder.append_value(i);
393 } else {
394 builder.append_null();
395 }
396 }
397 Arc::new(builder.finish())
398 }
399 DataType::Float64 => {
400 let mut builder = Float64Builder::new();
401 for value in col_values {
402 if value.is_null() {
403 builder.append_null();
404 } else if let Some(f) = value.as_f64() {
405 builder.append_value(f);
406 } else {
407 builder.append_null();
408 }
409 }
410 Arc::new(builder.finish())
411 }
412 DataType::Utf8 => {
413 let mut builder = StringBuilder::new();
414 for value in col_values {
415 if value.is_null() {
416 builder.append_null();
417 } else if let Some(s) = value.as_str() {
418 builder.append_value(s);
419 } else {
420 builder.append_value(value.to_string());
422 }
423 }
424 Arc::new(builder.finish())
425 }
426 DataType::Decimal128(precision, scale) => {
427 let mut builder = Decimal128Builder::new()
428 .with_precision_and_scale(*precision, *scale)
429 .map_err(|e| ConversionError::ArrowError(e.to_string()))?;
430
431 for value in col_values {
432 if value.is_null() {
433 builder.append_null();
434 } else if let Some(s) = value.as_str() {
435 let scaled = Self::parse_string_to_decimal(s, *scale)?;
437 builder.append_value(scaled);
438 } else if let Some(i) = value.as_i64() {
439 let scaled = i * 10i64.pow(*scale as u32);
441 builder.append_value(scaled as i128);
442 } else if let Some(f) = value.as_f64() {
443 let scaled = (f * 10f64.powi(*scale as i32)) as i128;
444 builder.append_value(scaled);
445 } else {
446 builder.append_null();
447 }
448 }
449 Arc::new(builder.finish())
450 }
451 DataType::Date32 => {
452 let mut builder = Date32Builder::new();
453 for value in col_values {
454 if value.is_null() {
455 builder.append_null();
456 } else if let Some(s) = value.as_str() {
457 match Self::parse_date_to_days(s) {
459 Ok(days) => builder.append_value(days),
460 Err(_) => builder.append_null(),
461 }
462 } else {
463 builder.append_null();
464 }
465 }
466 Arc::new(builder.finish())
467 }
468 DataType::Timestamp(_, tz) => {
469 let mut builder = TimestampMicrosecondBuilder::new();
470 for value in col_values {
471 if value.is_null() {
472 builder.append_null();
473 } else if let Some(s) = value.as_str() {
474 match Self::parse_timestamp_to_micros(s) {
475 Ok(micros) => builder.append_value(micros),
476 Err(_) => builder.append_null(),
477 }
478 } else {
479 builder.append_null();
480 }
481 }
482 let array = builder.finish();
483 if tz.is_some() {
484 Arc::new(array.with_timezone("UTC"))
485 } else {
486 Arc::new(array)
487 }
488 }
489 _ => {
490 let mut builder = StringBuilder::new();
492 for value in col_values {
493 if value.is_null() {
494 builder.append_null();
495 } else {
496 builder.append_value(value.to_string());
497 }
498 }
499 Arc::new(builder.finish())
500 }
501 };
502
503 arrays.push(array);
504 }
505
506 RecordBatch::try_new(Arc::clone(schema), arrays)
507 .map_err(|e| ConversionError::ArrowError(e.to_string()))
508 }
509
510 fn parse_date_to_days(date_str: &str) -> Result<i32, ()> {
512 let parts: Vec<&str> = date_str.split('-').collect();
513 if parts.len() != 3 {
514 return Err(());
515 }
516
517 let year: i32 = parts[0].parse().map_err(|_| ())?;
518 let month: u32 = parts[1].parse().map_err(|_| ())?;
519 let day: u32 = parts[2].parse().map_err(|_| ())?;
520
521 if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
522 return Err(());
523 }
524
525 let days_from_year =
527 (year - 1970) * 365 + (year - 1969) / 4 - (year - 1901) / 100 + (year - 1601) / 400;
528 let days_from_month = match month {
529 1 => 0,
530 2 => 31,
531 3 => 59,
532 4 => 90,
533 5 => 120,
534 6 => 151,
535 7 => 181,
536 8 => 212,
537 9 => 243,
538 10 => 273,
539 11 => 304,
540 12 => 334,
541 _ => return Err(()),
542 };
543
544 let is_leap_year = (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0);
546 let leap_adjustment = if month > 2 && is_leap_year { 1 } else { 0 };
547
548 Ok(days_from_year + days_from_month + day as i32 - 1 + leap_adjustment)
549 }
550
551 fn parse_timestamp_to_micros(timestamp_str: &str) -> Result<i64, ()> {
553 let parts: Vec<&str> = timestamp_str.split(' ').collect();
555 if parts.is_empty() {
556 return Err(());
557 }
558
559 let days = Self::parse_date_to_days(parts[0])?;
561 let mut micros = days as i64 * 86400 * 1_000_000;
562
563 if parts.len() > 1 {
565 let time_parts: Vec<&str> = parts[1].split(':').collect();
566 if time_parts.len() >= 2 {
567 let hours: i64 = time_parts[0].parse().map_err(|_| ())?;
568 let minutes: i64 = time_parts[1].parse().map_err(|_| ())?;
569
570 micros += hours * 3600 * 1_000_000;
571 micros += minutes * 60 * 1_000_000;
572
573 if time_parts.len() >= 3 {
574 let sec_parts: Vec<&str> = time_parts[2].split('.').collect();
576 let seconds: i64 = sec_parts[0].parse().map_err(|_| ())?;
577
578 micros += seconds * 1_000_000;
579
580 if sec_parts.len() > 1 {
581 let frac = sec_parts[1];
583 let frac_micros = if frac.len() <= 6 {
584 let padding = 6 - frac.len();
585 let padded = format!("{}{}", frac, "0".repeat(padding));
586 padded.parse::<i64>().unwrap_or(0)
587 } else {
588 frac[..6].parse::<i64>().unwrap_or(0)
589 };
590 micros += frac_micros;
591 }
592 }
593 }
594 }
595
596 Ok(micros)
597 }
598
599 fn parse_string_to_decimal(s: &str, scale: i8) -> Result<i128, ConversionError> {
603 if s.is_empty() {
605 return Err(ConversionError::InvalidFormat(
606 "Empty decimal string".to_string(),
607 ));
608 }
609
610 let parts: Vec<&str> = s.split('.').collect();
612
613 let (integer_part, decimal_part) = match parts.len() {
614 1 => (parts[0], ""),
615 2 => (parts[0], parts[1]),
616 _ => {
617 return Err(ConversionError::InvalidFormat(format!(
618 "Invalid decimal format: {}",
619 s
620 )));
621 }
622 };
623
624 let mut result: i128 = integer_part.parse().map_err(|_| {
626 ConversionError::InvalidFormat(format!("Invalid integer part: {}", integer_part))
627 })?;
628
629 result = result
631 .checked_mul(10_i128.pow(scale as u32))
632 .ok_or_else(|| ConversionError::InvalidFormat("Decimal overflow".to_string()))?;
633
634 if !decimal_part.is_empty() {
636 let decimal_digits = decimal_part.len().min(scale as usize);
637 let decimal_value: i128 = decimal_part[..decimal_digits].parse().map_err(|_| {
638 ConversionError::InvalidFormat(format!("Invalid decimal part: {}", decimal_part))
639 })?;
640
641 let scale_diff = scale as usize - decimal_digits;
643 let scaled_decimal = decimal_value * 10_i128.pow(scale_diff as u32);
644
645 result = result
646 .checked_add(if integer_part.starts_with('-') {
647 -scaled_decimal
648 } else {
649 scaled_decimal
650 })
651 .ok_or_else(|| ConversionError::InvalidFormat("Decimal overflow".to_string()))?;
652 }
653
654 Ok(result)
655 }
656}
657
658pub struct ResultSetIterator {
662 handle: Option<ResultSetHandle>,
664 transport: Arc<Mutex<dyn TransportProtocol>>,
666 metadata: QueryMetadata,
668 batches: Vec<RecordBatch>,
670 current_index: usize,
672 complete: bool,
674}
675
676impl ResultSetIterator {
677 pub fn metadata(&self) -> &QueryMetadata {
679 &self.metadata
680 }
681
682 async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>, QueryError> {
684 if self.complete {
685 return Ok(None);
686 }
687
688 let handle = match self.handle {
689 Some(h) => h,
690 None => {
691 self.complete = true;
692 return Ok(None);
693 }
694 };
695
696 let mut transport = self.transport.lock().await;
697 let result_data = transport
698 .fetch_results(handle)
699 .await
700 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
701
702 if result_data.data.is_empty() {
703 self.complete = true;
704 return Ok(None);
705 }
706
707 let batch = ResultSet::column_major_to_record_batch(&result_data, &self.metadata.schema)
708 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
709
710 Ok(Some(batch))
711 }
712
713 pub fn next_batch(&mut self) -> Option<Result<RecordBatch, QueryError>> {
717 if self.current_index < self.batches.len() {
719 let batch = self.batches[self.current_index].clone();
720 self.current_index += 1;
721 return Some(Ok(batch));
722 }
723
724 if self.complete {
726 return None;
727 }
728
729 let runtime = tokio::runtime::Handle::try_current();
731 if let Ok(handle) = runtime {
732 let result = handle.block_on(self.fetch_next_batch());
733 match result {
734 Ok(Some(batch)) => {
735 self.batches.push(batch.clone());
736 self.current_index += 1;
737 Some(Ok(batch))
738 }
739 Ok(None) => None,
740 Err(e) => Some(Err(e)),
741 }
742 } else {
743 Some(Err(QueryError::InvalidState(
745 "No async runtime available".to_string(),
746 )))
747 }
748 }
749
750 pub async fn close(mut self) -> Result<(), QueryError> {
752 if let Some(handle) = self.handle.take() {
753 let mut transport = self.transport.lock().await;
754 transport
755 .close_result_set(handle)
756 .await
757 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
758 }
759 Ok(())
760 }
761}
762
763impl Iterator for ResultSetIterator {
765 type Item = Result<RecordBatch, QueryError>;
766
767 fn next(&mut self) -> Option<Self::Item> {
768 self.next_batch()
769 }
770}
771
772#[cfg(test)]
773mod tests {
774 use super::*;
775 use crate::transport::messages::{ColumnInfo, DataType};
776 use crate::transport::protocol::{
777 PreparedStatementHandle, QueryResult as TransportQueryResult,
778 };
779 use arrow::array::Array;
780 use async_trait::async_trait;
781 use mockall::mock;
782
783 mock! {
785 pub Transport {}
786
787 #[async_trait]
788 impl TransportProtocol for Transport {
789 async fn connect(&mut self, params: &crate::transport::protocol::ConnectionParams) -> Result<(), crate::error::TransportError>;
790 async fn authenticate(&mut self, credentials: &crate::transport::protocol::Credentials) -> Result<crate::transport::messages::SessionInfo, crate::error::TransportError>;
791 async fn execute_query(&mut self, sql: &str) -> Result<TransportQueryResult, crate::error::TransportError>;
792 async fn fetch_results(&mut self, handle: ResultSetHandle) -> Result<ResultData, crate::error::TransportError>;
793 async fn close_result_set(&mut self, handle: ResultSetHandle) -> Result<(), crate::error::TransportError>;
794 async fn create_prepared_statement(&mut self, sql: &str) -> Result<PreparedStatementHandle, crate::error::TransportError>;
795 async fn execute_prepared_statement(&mut self, handle: &PreparedStatementHandle, parameters: Option<Vec<Vec<serde_json::Value>>>) -> Result<TransportQueryResult, crate::error::TransportError>;
796 async fn close_prepared_statement(&mut self, handle: &PreparedStatementHandle) -> Result<(), crate::error::TransportError>;
797 async fn close(&mut self) -> Result<(), crate::error::TransportError>;
798 fn is_connected(&self) -> bool;
799 async fn set_autocommit(&mut self, enabled: bool) -> Result<(), crate::error::TransportError>;
800 }
801 }
802
803 #[tokio::test]
804 async fn test_result_set_row_count() {
805 let mock_transport = MockTransport::new();
806 let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
807
808 let result = TransportQueryResult::RowCount { count: 42 };
809 let result_set = ResultSet::from_transport_result(result, transport).unwrap();
810
811 assert_eq!(result_set.row_count(), Some(42));
812 assert!(!result_set.is_stream());
813 assert!(result_set.metadata().is_none());
814 }
815
816 #[tokio::test]
817 async fn test_result_set_stream() {
818 let mock_transport = MockTransport::new();
819 let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
820
821 let data = ResultData {
825 columns: vec![
826 ColumnInfo {
827 name: "id".to_string(),
828 data_type: DataType {
829 type_name: "DECIMAL".to_string(),
830 precision: Some(18),
831 scale: Some(0),
832 size: None,
833 character_set: None,
834 with_local_time_zone: None,
835 fraction: None,
836 },
837 },
838 ColumnInfo {
839 name: "name".to_string(),
840 data_type: DataType {
841 type_name: "VARCHAR".to_string(),
842 precision: None,
843 scale: None,
844 size: Some(100),
845 character_set: Some("UTF8".to_string()),
846 with_local_time_zone: None,
847 fraction: None,
848 },
849 },
850 ],
851 data: vec![
852 vec![serde_json::json!(1), serde_json::json!("Alice")],
853 vec![serde_json::json!(2), serde_json::json!("Bob")],
854 ],
855 total_rows: 2,
856 };
857
858 let result = TransportQueryResult::ResultSet {
859 handle: Some(ResultSetHandle::new(1)),
860 data,
861 };
862
863 let result_set = ResultSet::from_transport_result(result, transport).unwrap();
864
865 assert!(result_set.row_count().is_none());
866 assert!(result_set.is_stream());
867
868 let metadata = result_set.metadata().unwrap();
869 assert_eq!(metadata.column_count, 2);
870 assert_eq!(metadata.total_rows, Some(2));
871 assert_eq!(metadata.column_names(), vec!["id", "name"]);
872 }
873
874 #[tokio::test]
875 async fn test_result_set_to_record_batch() {
876 let schema = Arc::new(Schema::new(vec![
877 Field::new("id", arrow::datatypes::DataType::Decimal128(18, 0), true),
878 Field::new("name", arrow::datatypes::DataType::Utf8, true),
879 Field::new("active", arrow::datatypes::DataType::Boolean, true),
880 ]));
881
882 let data = ResultData {
886 columns: vec![],
887 data: vec![
888 vec![
889 serde_json::json!(1),
890 serde_json::json!("Alice"),
891 serde_json::json!(true),
892 ],
893 vec![
894 serde_json::json!(2),
895 serde_json::json!("Bob"),
896 serde_json::json!(false),
897 ],
898 ],
899 total_rows: 2,
900 };
901
902 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
903
904 assert_eq!(batch.num_rows(), 2);
905 assert_eq!(batch.num_columns(), 3);
906 }
907
908 #[tokio::test]
909 async fn test_single_row_single_column() {
910 let schema = Arc::new(Schema::new(vec![Field::new(
912 "answer",
913 arrow::datatypes::DataType::Decimal128(18, 0),
914 true,
915 )]));
916
917 let data = ResultData {
919 columns: vec![],
920 data: vec![
921 vec![serde_json::json!(42)], ],
923 total_rows: 1,
924 };
925
926 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
927
928 assert_eq!(batch.num_rows(), 1, "Should have exactly 1 row");
929 assert_eq!(batch.num_columns(), 1, "Should have exactly 1 column");
930 }
931
932 #[tokio::test]
933 async fn test_single_row_two_columns() {
934 let schema = Arc::new(Schema::new(vec![
936 Field::new(
937 "answer",
938 arrow::datatypes::DataType::Decimal128(18, 0),
939 true,
940 ),
941 Field::new("greeting", arrow::datatypes::DataType::Utf8, true),
942 ]));
943
944 let data = ResultData {
946 columns: vec![],
947 data: vec![
948 vec![serde_json::json!(42), serde_json::json!("hello")], ],
950 total_rows: 1,
951 };
952
953 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
954
955 assert_eq!(batch.num_rows(), 1, "Should have exactly 1 row");
956 assert_eq!(batch.num_columns(), 2, "Should have exactly 2 columns");
957 }
958
959 #[tokio::test]
960 async fn test_ten_rows_two_columns() {
961 let schema = Arc::new(Schema::new(vec![
963 Field::new("id", arrow::datatypes::DataType::Decimal128(18, 0), true),
964 Field::new("label", arrow::datatypes::DataType::Utf8, true),
965 ]));
966
967 let data = ResultData {
969 columns: vec![],
970 data: vec![
971 vec![serde_json::json!(1), serde_json::json!("Row 1")],
972 vec![serde_json::json!(2), serde_json::json!("Row 2")],
973 vec![serde_json::json!(3), serde_json::json!("Row 3")],
974 vec![serde_json::json!(4), serde_json::json!("Row 4")],
975 vec![serde_json::json!(5), serde_json::json!("Row 5")],
976 vec![serde_json::json!(6), serde_json::json!("Row 6")],
977 vec![serde_json::json!(7), serde_json::json!("Row 7")],
978 vec![serde_json::json!(8), serde_json::json!("Row 8")],
979 vec![serde_json::json!(9), serde_json::json!("Row 9")],
980 vec![serde_json::json!(10), serde_json::json!("Row 10")],
981 ],
982 total_rows: 10,
983 };
984
985 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
986
987 assert_eq!(batch.num_rows(), 10, "Should have exactly 10 rows");
988 assert_eq!(batch.num_columns(), 2, "Should have exactly 2 columns");
989 }
990
991 #[tokio::test]
992 async fn test_schema_building() {
993 let columns = vec![
994 ColumnInfo {
995 name: "id".to_string(),
996 data_type: DataType {
997 type_name: "DECIMAL".to_string(),
998 precision: Some(18),
999 scale: Some(0),
1000 size: None,
1001 character_set: None,
1002 with_local_time_zone: None,
1003 fraction: None,
1004 },
1005 },
1006 ColumnInfo {
1007 name: "name".to_string(),
1008 data_type: DataType {
1009 type_name: "VARCHAR".to_string(),
1010 precision: None,
1011 scale: None,
1012 size: Some(100),
1013 character_set: Some("UTF8".to_string()),
1014 with_local_time_zone: None,
1015 fraction: None,
1016 },
1017 },
1018 ColumnInfo {
1019 name: "created_at".to_string(),
1020 data_type: DataType {
1021 type_name: "TIMESTAMP".to_string(),
1022 precision: None,
1023 scale: None,
1024 size: None,
1025 character_set: None,
1026 with_local_time_zone: Some(false),
1027 fraction: None,
1028 },
1029 },
1030 ];
1031
1032 let schema = ResultSet::build_schema(&columns).unwrap();
1033
1034 assert_eq!(schema.fields().len(), 3);
1035 assert_eq!(schema.field(0).name(), "id");
1036 assert_eq!(schema.field(1).name(), "name");
1037 assert_eq!(schema.field(2).name(), "created_at");
1038 }
1039
1040 #[test]
1041 fn test_query_metadata() {
1042 let schema = Arc::new(Schema::new(vec![
1043 Field::new("id", arrow::datatypes::DataType::Int64, false),
1044 Field::new("name", arrow::datatypes::DataType::Utf8, true),
1045 ]));
1046
1047 let metadata = QueryMetadata::new(Arc::clone(&schema), Some(100)).with_execution_time(250);
1048
1049 assert_eq!(metadata.column_count, 2);
1050 assert_eq!(metadata.total_rows, Some(100));
1051 assert_eq!(metadata.execution_time_ms, Some(250));
1052 assert_eq!(metadata.column_names(), vec!["id", "name"]);
1053 }
1054
1055 #[test]
1060 fn test_query_metadata_column_types_returns_correct_types() {
1061 let schema = Arc::new(Schema::new(vec![
1062 Field::new("id", arrow::datatypes::DataType::Int64, false),
1063 Field::new("name", arrow::datatypes::DataType::Utf8, true),
1064 Field::new("active", arrow::datatypes::DataType::Boolean, true),
1065 ]));
1066
1067 let metadata = QueryMetadata::new(Arc::clone(&schema), Some(100));
1068 let types = metadata.column_types();
1069
1070 assert_eq!(types.len(), 3);
1071 assert_eq!(types[0], &arrow::datatypes::DataType::Int64);
1072 assert_eq!(types[1], &arrow::datatypes::DataType::Utf8);
1073 assert_eq!(types[2], &arrow::datatypes::DataType::Boolean);
1074 }
1075
1076 #[test]
1077 fn test_query_metadata_column_types_empty_schema() {
1078 let fields: Vec<Field> = vec![];
1079 let schema = Arc::new(Schema::new(fields));
1080
1081 let metadata = QueryMetadata::new(Arc::clone(&schema), None);
1082 let types = metadata.column_types();
1083
1084 assert!(types.is_empty());
1085 }
1086
1087 #[test]
1092 fn test_parse_date_to_days_unix_epoch() {
1093 let result = ResultSet::parse_date_to_days("1970-01-01").unwrap();
1094 assert_eq!(result, 0);
1095 }
1096
1097 #[test]
1098 fn test_parse_date_to_days_after_epoch() {
1099 let result = ResultSet::parse_date_to_days("1970-01-02").unwrap();
1100 assert_eq!(result, 1);
1101 }
1102
1103 #[test]
1104 fn test_parse_date_to_days_year_2000() {
1105 let result = ResultSet::parse_date_to_days("2000-01-01").unwrap();
1107 assert_eq!(result, 10957);
1108 }
1109
1110 #[test]
1111 fn test_parse_date_to_days_leap_year() {
1112 let result = ResultSet::parse_date_to_days("2000-03-01").unwrap();
1114 assert_eq!(result, 11017);
1117 }
1118
1119 #[test]
1120 fn test_parse_date_to_days_non_leap_year() {
1121 let result = ResultSet::parse_date_to_days("2001-03-01").unwrap();
1123 assert_eq!(result, 11382);
1126 }
1127
1128 #[test]
1129 fn test_parse_date_to_days_before_epoch() {
1130 let result = ResultSet::parse_date_to_days("1969-12-31").unwrap();
1132 assert_eq!(result, -1);
1133 }
1134
1135 #[test]
1136 fn test_parse_date_to_days_invalid_format_wrong_separator() {
1137 let result = ResultSet::parse_date_to_days("2000/01/01");
1138 assert!(result.is_err());
1139 }
1140
1141 #[test]
1142 fn test_parse_date_to_days_invalid_format_missing_parts() {
1143 let result = ResultSet::parse_date_to_days("2000-01");
1144 assert!(result.is_err());
1145 }
1146
1147 #[test]
1148 fn test_parse_date_to_days_invalid_month_zero() {
1149 let result = ResultSet::parse_date_to_days("2000-00-01");
1150 assert!(result.is_err());
1151 }
1152
1153 #[test]
1154 fn test_parse_date_to_days_invalid_month_thirteen() {
1155 let result = ResultSet::parse_date_to_days("2000-13-01");
1156 assert!(result.is_err());
1157 }
1158
1159 #[test]
1160 fn test_parse_date_to_days_invalid_day_zero() {
1161 let result = ResultSet::parse_date_to_days("2000-01-00");
1162 assert!(result.is_err());
1163 }
1164
1165 #[test]
1166 fn test_parse_date_to_days_invalid_day_thirty_two() {
1167 let result = ResultSet::parse_date_to_days("2000-01-32");
1168 assert!(result.is_err());
1169 }
1170
1171 #[test]
1172 fn test_parse_date_to_days_invalid_non_numeric() {
1173 let result = ResultSet::parse_date_to_days("YYYY-MM-DD");
1174 assert!(result.is_err());
1175 }
1176
1177 #[test]
1182 fn test_parse_timestamp_to_micros_date_only() {
1183 let result = ResultSet::parse_timestamp_to_micros("1970-01-01").unwrap();
1184 assert_eq!(result, 0);
1185 }
1186
1187 #[test]
1188 fn test_parse_timestamp_to_micros_with_time() {
1189 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 01:00:00").unwrap();
1191 assert_eq!(result, 3600 * 1_000_000);
1192 }
1193
1194 #[test]
1195 fn test_parse_timestamp_to_micros_with_minutes() {
1196 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:30:00").unwrap();
1198 assert_eq!(result, 30 * 60 * 1_000_000);
1199 }
1200
1201 #[test]
1202 fn test_parse_timestamp_to_micros_with_seconds() {
1203 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:45").unwrap();
1205 assert_eq!(result, 45 * 1_000_000);
1206 }
1207
1208 #[test]
1209 fn test_parse_timestamp_to_micros_with_fractional_seconds_3_digits() {
1210 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.123").unwrap();
1212 assert_eq!(result, 123000);
1213 }
1214
1215 #[test]
1216 fn test_parse_timestamp_to_micros_with_fractional_seconds_6_digits() {
1217 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.123456").unwrap();
1219 assert_eq!(result, 123456);
1220 }
1221
1222 #[test]
1223 fn test_parse_timestamp_to_micros_with_fractional_seconds_more_than_6_digits() {
1224 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.1234567").unwrap();
1226 assert_eq!(result, 123456);
1227 }
1228
1229 #[test]
1230 fn test_parse_timestamp_to_micros_with_fractional_seconds_1_digit() {
1231 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.1").unwrap();
1233 assert_eq!(result, 100000);
1234 }
1235
1236 #[test]
1237 fn test_parse_timestamp_to_micros_complex_timestamp() {
1238 let result = ResultSet::parse_timestamp_to_micros("2000-06-15 12:30:45.500").unwrap();
1243
1244 let days_micros: i64 =
1246 ResultSet::parse_date_to_days("2000-06-15").unwrap() as i64 * 86400 * 1_000_000;
1247 let time_micros: i64 = (12 * 3600 + 30 * 60 + 45) * 1_000_000 + 500000;
1248 assert_eq!(result, days_micros + time_micros);
1249 }
1250
1251 #[test]
1252 fn test_parse_timestamp_to_micros_empty_string() {
1253 let result = ResultSet::parse_timestamp_to_micros("");
1254 assert!(result.is_err());
1255 }
1256
1257 #[test]
1258 fn test_parse_timestamp_to_micros_hours_and_minutes_only() {
1259 let result = ResultSet::parse_timestamp_to_micros("1970-01-01 01:30").unwrap();
1262 assert_eq!(result, 5400 * 1_000_000);
1263 }
1264
1265 #[test]
1270 fn test_parse_string_to_decimal_integer() {
1271 let result = ResultSet::parse_string_to_decimal("123", 2).unwrap();
1272 assert_eq!(result, 12300);
1274 }
1275
1276 #[test]
1277 fn test_parse_string_to_decimal_with_decimal_point() {
1278 let result = ResultSet::parse_string_to_decimal("123.45", 2).unwrap();
1279 assert_eq!(result, 12345);
1281 }
1282
1283 #[test]
1284 fn test_parse_string_to_decimal_negative() {
1285 let result = ResultSet::parse_string_to_decimal("-123.45", 2).unwrap();
1286 assert_eq!(result, -12345);
1288 }
1289
1290 #[test]
1291 fn test_parse_string_to_decimal_zero_scale() {
1292 let result = ResultSet::parse_string_to_decimal("123", 0).unwrap();
1293 assert_eq!(result, 123);
1294 }
1295
1296 #[test]
1297 fn test_parse_string_to_decimal_high_scale() {
1298 let result = ResultSet::parse_string_to_decimal("1.5", 6).unwrap();
1299 assert_eq!(result, 1500000);
1301 }
1302
1303 #[test]
1304 fn test_parse_string_to_decimal_truncates_extra_decimals() {
1305 let result = ResultSet::parse_string_to_decimal("1.123456", 3).unwrap();
1307 assert_eq!(result, 1123);
1309 }
1310
1311 #[test]
1312 fn test_parse_string_to_decimal_zero() {
1313 let result = ResultSet::parse_string_to_decimal("0", 2).unwrap();
1314 assert_eq!(result, 0);
1315 }
1316
1317 #[test]
1318 fn test_parse_string_to_decimal_negative_zero() {
1319 let result = ResultSet::parse_string_to_decimal("-0", 2).unwrap();
1320 assert_eq!(result, 0);
1321 }
1322
1323 #[test]
1324 fn test_parse_string_to_decimal_empty_string() {
1325 let result = ResultSet::parse_string_to_decimal("", 2);
1326 assert!(result.is_err());
1327 }
1328
1329 #[test]
1330 fn test_parse_string_to_decimal_multiple_decimal_points() {
1331 let result = ResultSet::parse_string_to_decimal("1.2.3", 2);
1332 assert!(result.is_err());
1333 }
1334
1335 #[test]
1336 fn test_parse_string_to_decimal_non_numeric() {
1337 let result = ResultSet::parse_string_to_decimal("abc", 2);
1338 assert!(result.is_err());
1339 }
1340
1341 #[test]
1342 fn test_parse_string_to_decimal_large_value() {
1343 let result = ResultSet::parse_string_to_decimal("999999999999999999", 0).unwrap();
1344 assert_eq!(result, 999999999999999999_i128);
1345 }
1346
1347 #[test]
1352 fn test_exasol_datatype_to_arrow_boolean() {
1353 let data_type = DataType {
1354 type_name: "BOOLEAN".to_string(),
1355 precision: None,
1356 scale: None,
1357 size: None,
1358 character_set: None,
1359 with_local_time_zone: None,
1360 fraction: None,
1361 };
1362
1363 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1364 assert_eq!(result, arrow::datatypes::DataType::Boolean);
1365 }
1366
1367 #[test]
1368 fn test_exasol_datatype_to_arrow_char() {
1369 let data_type = DataType {
1370 type_name: "CHAR".to_string(),
1371 precision: None,
1372 scale: None,
1373 size: Some(10),
1374 character_set: None,
1375 with_local_time_zone: None,
1376 fraction: None,
1377 };
1378
1379 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1380 assert_eq!(result, arrow::datatypes::DataType::Utf8);
1381 }
1382
1383 #[test]
1384 fn test_exasol_datatype_to_arrow_char_default_size() {
1385 let data_type = DataType {
1386 type_name: "CHAR".to_string(),
1387 precision: None,
1388 scale: None,
1389 size: None, character_set: None,
1391 with_local_time_zone: None,
1392 fraction: None,
1393 };
1394
1395 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1396 assert_eq!(result, arrow::datatypes::DataType::Utf8);
1397 }
1398
1399 #[test]
1400 fn test_exasol_datatype_to_arrow_varchar() {
1401 let data_type = DataType {
1402 type_name: "VARCHAR".to_string(),
1403 precision: None,
1404 scale: None,
1405 size: Some(100),
1406 character_set: None,
1407 with_local_time_zone: None,
1408 fraction: None,
1409 };
1410
1411 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1412 assert_eq!(result, arrow::datatypes::DataType::Utf8);
1413 }
1414
1415 #[test]
1416 fn test_exasol_datatype_to_arrow_varchar_default_size() {
1417 let data_type = DataType {
1418 type_name: "VARCHAR".to_string(),
1419 precision: None,
1420 scale: None,
1421 size: None, character_set: None,
1423 with_local_time_zone: None,
1424 fraction: None,
1425 };
1426
1427 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1428 assert_eq!(result, arrow::datatypes::DataType::Utf8);
1429 }
1430
1431 #[test]
1432 fn test_exasol_datatype_to_arrow_decimal() {
1433 let data_type = DataType {
1434 type_name: "DECIMAL".to_string(),
1435 precision: Some(18),
1436 scale: Some(2),
1437 size: None,
1438 character_set: None,
1439 with_local_time_zone: None,
1440 fraction: None,
1441 };
1442
1443 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1444 assert_eq!(result, arrow::datatypes::DataType::Decimal128(18, 2));
1445 }
1446
1447 #[test]
1448 fn test_exasol_datatype_to_arrow_decimal_default_precision_scale() {
1449 let data_type = DataType {
1450 type_name: "DECIMAL".to_string(),
1451 precision: None, scale: None, size: None,
1454 character_set: None,
1455 with_local_time_zone: None,
1456 fraction: None,
1457 };
1458
1459 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1460 assert_eq!(result, arrow::datatypes::DataType::Decimal128(18, 0));
1461 }
1462
1463 #[test]
1464 fn test_exasol_datatype_to_arrow_double() {
1465 let data_type = DataType {
1466 type_name: "DOUBLE".to_string(),
1467 precision: None,
1468 scale: None,
1469 size: None,
1470 character_set: None,
1471 with_local_time_zone: None,
1472 fraction: None,
1473 };
1474
1475 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1476 assert_eq!(result, arrow::datatypes::DataType::Float64);
1477 }
1478
1479 #[test]
1480 fn test_exasol_datatype_to_arrow_date() {
1481 let data_type = DataType {
1482 type_name: "DATE".to_string(),
1483 precision: None,
1484 scale: None,
1485 size: None,
1486 character_set: None,
1487 with_local_time_zone: None,
1488 fraction: None,
1489 };
1490
1491 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1492 assert_eq!(result, arrow::datatypes::DataType::Date32);
1493 }
1494
1495 #[test]
1496 fn test_exasol_datatype_to_arrow_timestamp_without_tz() {
1497 let data_type = DataType {
1498 type_name: "TIMESTAMP".to_string(),
1499 precision: None,
1500 scale: None,
1501 size: None,
1502 character_set: None,
1503 with_local_time_zone: Some(false),
1504 fraction: None,
1505 };
1506
1507 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1508 assert!(matches!(
1509 result,
1510 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None)
1511 ));
1512 }
1513
1514 #[test]
1515 fn test_exasol_datatype_to_arrow_timestamp_with_tz() {
1516 let data_type = DataType {
1517 type_name: "TIMESTAMP".to_string(),
1518 precision: None,
1519 scale: None,
1520 size: None,
1521 character_set: None,
1522 with_local_time_zone: Some(true),
1523 fraction: None,
1524 };
1525
1526 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1527 assert!(matches!(
1528 result,
1529 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, Some(_))
1530 ));
1531 }
1532
1533 #[test]
1534 fn test_exasol_datatype_to_arrow_timestamp_default_tz() {
1535 let data_type = DataType {
1536 type_name: "TIMESTAMP".to_string(),
1537 precision: None,
1538 scale: None,
1539 size: None,
1540 character_set: None,
1541 with_local_time_zone: None, fraction: None,
1543 };
1544
1545 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1546 assert!(matches!(
1547 result,
1548 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None)
1549 ));
1550 }
1551
1552 #[test]
1553 fn test_exasol_datatype_to_arrow_interval_year_to_month() {
1554 let data_type = DataType {
1555 type_name: "INTERVAL YEAR TO MONTH".to_string(),
1556 precision: None,
1557 scale: None,
1558 size: None,
1559 character_set: None,
1560 with_local_time_zone: None,
1561 fraction: None,
1562 };
1563
1564 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1565 assert!(matches!(
1566 result,
1567 arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1568 ));
1569 }
1570
1571 #[test]
1572 fn test_exasol_datatype_to_arrow_interval_day_to_second() {
1573 let data_type = DataType {
1574 type_name: "INTERVAL DAY TO SECOND".to_string(),
1575 precision: None,
1576 scale: None,
1577 size: None,
1578 character_set: None,
1579 with_local_time_zone: None,
1580 fraction: Some(6),
1581 };
1582
1583 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1584 assert!(matches!(
1585 result,
1586 arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1587 ));
1588 }
1589
1590 #[test]
1591 fn test_exasol_datatype_to_arrow_interval_day_to_second_default_fraction() {
1592 let data_type = DataType {
1593 type_name: "INTERVAL DAY TO SECOND".to_string(),
1594 precision: None,
1595 scale: None,
1596 size: None,
1597 character_set: None,
1598 with_local_time_zone: None,
1599 fraction: None, };
1601
1602 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1603 assert!(matches!(
1604 result,
1605 arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1606 ));
1607 }
1608
1609 #[test]
1610 fn test_exasol_datatype_to_arrow_geometry() {
1611 let data_type = DataType {
1612 type_name: "GEOMETRY".to_string(),
1613 precision: None,
1614 scale: None,
1615 size: None,
1616 character_set: None,
1617 with_local_time_zone: None,
1618 fraction: None,
1619 };
1620
1621 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1622 assert_eq!(result, arrow::datatypes::DataType::Binary);
1623 }
1624
1625 #[test]
1626 fn test_exasol_datatype_to_arrow_hashtype() {
1627 let data_type = DataType {
1628 type_name: "HASHTYPE".to_string(),
1629 precision: None,
1630 scale: None,
1631 size: None,
1632 character_set: None,
1633 with_local_time_zone: None,
1634 fraction: None,
1635 };
1636
1637 let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1638 assert_eq!(result, arrow::datatypes::DataType::Binary);
1639 }
1640
1641 #[test]
1642 fn test_exasol_datatype_to_arrow_unsupported_type() {
1643 let data_type = DataType {
1644 type_name: "UNKNOWN_TYPE".to_string(),
1645 precision: None,
1646 scale: None,
1647 size: None,
1648 character_set: None,
1649 with_local_time_zone: None,
1650 fraction: None,
1651 };
1652
1653 let result = ResultSet::exasol_datatype_to_arrow(&data_type);
1654 assert!(result.is_err());
1655 }
1656
1657 #[tokio::test]
1662 async fn test_column_major_to_record_batch_with_int32() {
1663 let schema = Arc::new(Schema::new(vec![Field::new(
1664 "value",
1665 arrow::datatypes::DataType::Int32,
1666 true,
1667 )]));
1668
1669 let data = ResultData {
1670 columns: vec![],
1671 data: vec![
1672 vec![serde_json::json!(1)],
1673 vec![serde_json::json!(2)],
1674 vec![serde_json::json!(null)],
1675 vec![serde_json::json!(4)],
1676 ],
1677 total_rows: 4,
1678 };
1679
1680 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1681
1682 assert_eq!(batch.num_rows(), 4);
1683 assert_eq!(batch.num_columns(), 1);
1684
1685 let array = batch
1686 .column(0)
1687 .as_any()
1688 .downcast_ref::<arrow::array::Int32Array>()
1689 .unwrap();
1690 assert_eq!(array.value(0), 1);
1691 assert_eq!(array.value(1), 2);
1692 assert!(array.is_null(2));
1693 assert_eq!(array.value(3), 4);
1694 }
1695
1696 #[tokio::test]
1697 async fn test_column_major_to_record_batch_with_int64() {
1698 let schema = Arc::new(Schema::new(vec![Field::new(
1699 "value",
1700 arrow::datatypes::DataType::Int64,
1701 true,
1702 )]));
1703
1704 let data = ResultData {
1705 columns: vec![],
1706 data: vec![
1707 vec![serde_json::json!(9223372036854775807_i64)], vec![serde_json::json!(-9223372036854775808_i64)], vec![serde_json::json!(null)],
1710 ],
1711 total_rows: 3,
1712 };
1713
1714 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1715
1716 assert_eq!(batch.num_rows(), 3);
1717
1718 let array = batch
1719 .column(0)
1720 .as_any()
1721 .downcast_ref::<arrow::array::Int64Array>()
1722 .unwrap();
1723 assert_eq!(array.value(0), 9223372036854775807_i64);
1724 assert_eq!(array.value(1), -9223372036854775808_i64);
1725 assert!(array.is_null(2));
1726 }
1727
1728 #[tokio::test]
1729 async fn test_column_major_to_record_batch_with_float64() {
1730 let schema = Arc::new(Schema::new(vec![Field::new(
1731 "value",
1732 arrow::datatypes::DataType::Float64,
1733 true,
1734 )]));
1735
1736 let data = ResultData {
1737 columns: vec![],
1738 data: vec![
1739 vec![serde_json::json!(1.23456)],
1740 vec![serde_json::json!(-9.87654)],
1741 vec![serde_json::json!(null)],
1742 vec![serde_json::json!(0.0)],
1743 ],
1744 total_rows: 4,
1745 };
1746
1747 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1748
1749 assert_eq!(batch.num_rows(), 4);
1750
1751 let array = batch
1752 .column(0)
1753 .as_any()
1754 .downcast_ref::<arrow::array::Float64Array>()
1755 .unwrap();
1756 assert!((array.value(0) - 1.23456).abs() < 0.00001);
1757 assert!((array.value(1) - (-9.87654)).abs() < 0.00001);
1758 assert!(array.is_null(2));
1759 assert!((array.value(3) - 0.0).abs() < 0.00001);
1760 }
1761
1762 #[tokio::test]
1763 async fn test_column_major_to_record_batch_with_boolean() {
1764 let schema = Arc::new(Schema::new(vec![Field::new(
1765 "flag",
1766 arrow::datatypes::DataType::Boolean,
1767 true,
1768 )]));
1769
1770 let data = ResultData {
1771 columns: vec![],
1772 data: vec![
1773 vec![serde_json::json!(true)],
1774 vec![serde_json::json!(false)],
1775 vec![serde_json::json!(null)],
1776 ],
1777 total_rows: 3,
1778 };
1779
1780 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1781
1782 assert_eq!(batch.num_rows(), 3);
1783
1784 let array = batch
1785 .column(0)
1786 .as_any()
1787 .downcast_ref::<arrow::array::BooleanArray>()
1788 .unwrap();
1789 assert!(array.value(0));
1790 assert!(!array.value(1));
1791 assert!(array.is_null(2));
1792 }
1793
1794 #[tokio::test]
1795 async fn test_column_major_to_record_batch_with_date32() {
1796 let schema = Arc::new(Schema::new(vec![Field::new(
1797 "date",
1798 arrow::datatypes::DataType::Date32,
1799 true,
1800 )]));
1801
1802 let data = ResultData {
1803 columns: vec![],
1804 data: vec![
1805 vec![serde_json::json!("1970-01-01")],
1806 vec![serde_json::json!("2000-01-01")],
1807 vec![serde_json::json!(null)],
1808 ],
1809 total_rows: 3,
1810 };
1811
1812 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1813
1814 assert_eq!(batch.num_rows(), 3);
1815
1816 let array = batch
1817 .column(0)
1818 .as_any()
1819 .downcast_ref::<arrow::array::Date32Array>()
1820 .unwrap();
1821 assert_eq!(array.value(0), 0); assert_eq!(array.value(1), 10957); assert!(array.is_null(2));
1824 }
1825
1826 #[tokio::test]
1827 async fn test_column_major_to_record_batch_with_timestamp() {
1828 let schema = Arc::new(Schema::new(vec![Field::new(
1829 "timestamp",
1830 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1831 true,
1832 )]));
1833
1834 let data = ResultData {
1835 columns: vec![],
1836 data: vec![
1837 vec![serde_json::json!("1970-01-01 00:00:00")],
1838 vec![serde_json::json!("1970-01-01 01:00:00.123456")],
1839 vec![serde_json::json!(null)],
1840 ],
1841 total_rows: 3,
1842 };
1843
1844 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1845
1846 assert_eq!(batch.num_rows(), 3);
1847
1848 let array = batch
1849 .column(0)
1850 .as_any()
1851 .downcast_ref::<arrow::array::TimestampMicrosecondArray>()
1852 .unwrap();
1853 assert_eq!(array.value(0), 0);
1854 assert_eq!(array.value(1), 3600 * 1_000_000 + 123456);
1856 assert!(array.is_null(2));
1857 }
1858
1859 #[tokio::test]
1860 async fn test_column_major_to_record_batch_empty_data() {
1861 let schema = Arc::new(Schema::new(vec![
1862 Field::new("id", arrow::datatypes::DataType::Int64, true),
1863 Field::new("name", arrow::datatypes::DataType::Utf8, true),
1864 ]));
1865
1866 let data = ResultData {
1867 columns: vec![],
1868 data: vec![],
1869 total_rows: 0,
1870 };
1871
1872 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1873
1874 assert_eq!(batch.num_rows(), 0);
1875 assert_eq!(batch.num_columns(), 2);
1876 }
1877
1878 #[tokio::test]
1879 async fn test_column_major_to_record_batch_with_utf8_string() {
1880 let schema = Arc::new(Schema::new(vec![Field::new(
1881 "text",
1882 arrow::datatypes::DataType::Utf8,
1883 true,
1884 )]));
1885
1886 let data = ResultData {
1887 columns: vec![],
1888 data: vec![
1889 vec![serde_json::json!("hello")],
1890 vec![serde_json::json!("world")],
1891 vec![serde_json::json!(null)],
1892 vec![serde_json::json!("")], ],
1894 total_rows: 4,
1895 };
1896
1897 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1898
1899 assert_eq!(batch.num_rows(), 4);
1900
1901 let array = batch
1902 .column(0)
1903 .as_any()
1904 .downcast_ref::<arrow::array::StringArray>()
1905 .unwrap();
1906 assert_eq!(array.value(0), "hello");
1907 assert_eq!(array.value(1), "world");
1908 assert!(array.is_null(2));
1909 assert_eq!(array.value(3), "");
1910 }
1911
1912 #[tokio::test]
1913 async fn test_column_major_to_record_batch_utf8_from_non_string_value() {
1914 let schema = Arc::new(Schema::new(vec![Field::new(
1916 "text",
1917 arrow::datatypes::DataType::Utf8,
1918 true,
1919 )]));
1920
1921 let data = ResultData {
1922 columns: vec![],
1923 data: vec![
1924 vec![serde_json::json!(123)], ],
1926 total_rows: 1,
1927 };
1928
1929 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1930
1931 let array = batch
1932 .column(0)
1933 .as_any()
1934 .downcast_ref::<arrow::array::StringArray>()
1935 .unwrap();
1936 assert_eq!(array.value(0), "123");
1937 }
1938
1939 #[tokio::test]
1940 async fn test_column_major_to_record_batch_decimal_from_string() {
1941 let schema = Arc::new(Schema::new(vec![Field::new(
1942 "amount",
1943 arrow::datatypes::DataType::Decimal128(18, 2),
1944 true,
1945 )]));
1946
1947 let data = ResultData {
1948 columns: vec![],
1949 data: vec![
1950 vec![serde_json::json!("123.45")],
1951 vec![serde_json::json!("-67.89")],
1952 vec![serde_json::json!(null)],
1953 ],
1954 total_rows: 3,
1955 };
1956
1957 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1958
1959 assert_eq!(batch.num_rows(), 3);
1960
1961 let array = batch
1962 .column(0)
1963 .as_any()
1964 .downcast_ref::<arrow::array::Decimal128Array>()
1965 .unwrap();
1966 assert_eq!(array.value(0), 12345); assert_eq!(array.value(1), -6789); assert!(array.is_null(2));
1969 }
1970
1971 #[tokio::test]
1972 async fn test_column_major_to_record_batch_decimal_from_integer() {
1973 let schema = Arc::new(Schema::new(vec![Field::new(
1974 "amount",
1975 arrow::datatypes::DataType::Decimal128(18, 2),
1976 true,
1977 )]));
1978
1979 let data = ResultData {
1980 columns: vec![],
1981 data: vec![
1982 vec![serde_json::json!(100)], ],
1984 total_rows: 1,
1985 };
1986
1987 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1988
1989 let array = batch
1990 .column(0)
1991 .as_any()
1992 .downcast_ref::<arrow::array::Decimal128Array>()
1993 .unwrap();
1994 assert_eq!(array.value(0), 10000); }
1996
1997 #[tokio::test]
1998 async fn test_column_major_to_record_batch_decimal_from_float() {
1999 let schema = Arc::new(Schema::new(vec![Field::new(
2000 "amount",
2001 arrow::datatypes::DataType::Decimal128(18, 2),
2002 true,
2003 )]));
2004
2005 let data = ResultData {
2006 columns: vec![],
2007 data: vec![
2008 vec![serde_json::json!(99.99)], ],
2010 total_rows: 1,
2011 };
2012
2013 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2014
2015 let array = batch
2016 .column(0)
2017 .as_any()
2018 .downcast_ref::<arrow::array::Decimal128Array>()
2019 .unwrap();
2020 assert_eq!(array.value(0), 9999); }
2022
2023 #[tokio::test]
2024 async fn test_column_major_to_record_batch_invalid_date_becomes_null() {
2025 let schema = Arc::new(Schema::new(vec![Field::new(
2026 "date",
2027 arrow::datatypes::DataType::Date32,
2028 true,
2029 )]));
2030
2031 let data = ResultData {
2032 columns: vec![],
2033 data: vec![
2034 vec![serde_json::json!("invalid-date")],
2035 vec![serde_json::json!(123)], ],
2037 total_rows: 2,
2038 };
2039
2040 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2041
2042 let array = batch
2043 .column(0)
2044 .as_any()
2045 .downcast_ref::<arrow::array::Date32Array>()
2046 .unwrap();
2047 assert!(array.is_null(0));
2049 assert!(array.is_null(1));
2050 }
2051
2052 #[tokio::test]
2053 async fn test_column_major_to_record_batch_invalid_timestamp_becomes_null() {
2054 let schema = Arc::new(Schema::new(vec![Field::new(
2055 "timestamp",
2056 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
2057 true,
2058 )]));
2059
2060 let data = ResultData {
2061 columns: vec![],
2062 data: vec![
2063 vec![serde_json::json!("not-a-timestamp")],
2064 vec![serde_json::json!(12345)], ],
2066 total_rows: 2,
2067 };
2068
2069 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2070
2071 let array = batch
2072 .column(0)
2073 .as_any()
2074 .downcast_ref::<arrow::array::TimestampMicrosecondArray>()
2075 .unwrap();
2076 assert!(array.is_null(0));
2077 assert!(array.is_null(1));
2078 }
2079
2080 #[tokio::test]
2081 async fn test_column_major_to_record_batch_boolean_null_from_non_bool() {
2082 let schema = Arc::new(Schema::new(vec![Field::new(
2083 "flag",
2084 arrow::datatypes::DataType::Boolean,
2085 true,
2086 )]));
2087
2088 let data = ResultData {
2089 columns: vec![],
2090 data: vec![
2091 vec![serde_json::json!("not-a-bool")], vec![serde_json::json!(123)], ],
2094 total_rows: 2,
2095 };
2096
2097 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2098
2099 let array = batch
2100 .column(0)
2101 .as_any()
2102 .downcast_ref::<arrow::array::BooleanArray>()
2103 .unwrap();
2104 assert!(array.is_null(0));
2106 assert!(array.is_null(1));
2107 }
2108
2109 #[tokio::test]
2110 async fn test_column_major_to_record_batch_int32_null_from_non_int() {
2111 let schema = Arc::new(Schema::new(vec![Field::new(
2112 "value",
2113 arrow::datatypes::DataType::Int32,
2114 true,
2115 )]));
2116
2117 let data = ResultData {
2118 columns: vec![],
2119 data: vec![
2120 vec![serde_json::json!("not-an-int")], ],
2122 total_rows: 1,
2123 };
2124
2125 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2126
2127 let array = batch
2128 .column(0)
2129 .as_any()
2130 .downcast_ref::<arrow::array::Int32Array>()
2131 .unwrap();
2132 assert!(array.is_null(0));
2133 }
2134
2135 #[tokio::test]
2136 async fn test_column_major_to_record_batch_float64_null_from_non_float() {
2137 let schema = Arc::new(Schema::new(vec![Field::new(
2138 "value",
2139 arrow::datatypes::DataType::Float64,
2140 true,
2141 )]));
2142
2143 let data = ResultData {
2144 columns: vec![],
2145 data: vec![
2146 vec![serde_json::json!("not-a-float")], ],
2148 total_rows: 1,
2149 };
2150
2151 let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2152
2153 let array = batch
2154 .column(0)
2155 .as_any()
2156 .downcast_ref::<arrow::array::Float64Array>()
2157 .unwrap();
2158 assert!(array.is_null(0));
2159 }
2160
2161 #[tokio::test]
2162 async fn test_column_major_to_record_batch_unsupported_type_returns_error() {
2163 let schema = Arc::new(Schema::new(vec![Field::new(
2166 "large_text",
2167 arrow::datatypes::DataType::LargeUtf8,
2168 true,
2169 )]));
2170
2171 let data = ResultData {
2172 columns: vec![],
2173 data: vec![vec![serde_json::json!("test_data")]],
2174 total_rows: 1,
2175 };
2176
2177 let result = ResultSet::column_major_to_record_batch(&data, &schema);
2178
2179 assert!(result.is_err());
2182 }
2183
2184 #[tokio::test]
2189 async fn test_result_set_iterator_metadata() {
2190 let mock_transport = MockTransport::new();
2191 let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
2192
2193 let data = ResultData {
2194 columns: vec![ColumnInfo {
2195 name: "id".to_string(),
2196 data_type: DataType {
2197 type_name: "DECIMAL".to_string(),
2198 precision: Some(18),
2199 scale: Some(0),
2200 size: None,
2201 character_set: None,
2202 with_local_time_zone: None,
2203 fraction: None,
2204 },
2205 }],
2206 data: vec![vec![serde_json::json!(1)]],
2207 total_rows: 1,
2208 };
2209
2210 let result = TransportQueryResult::ResultSet {
2211 handle: Some(ResultSetHandle::new(1)),
2212 data,
2213 };
2214
2215 let result_set = ResultSet::from_transport_result(result, transport).unwrap();
2216 let iterator = result_set.into_iterator().unwrap();
2217
2218 let metadata = iterator.metadata();
2219 assert_eq!(metadata.column_count, 1);
2220 assert_eq!(metadata.total_rows, Some(1));
2221 assert_eq!(metadata.column_names(), vec!["id"]);
2222 }
2223
2224 #[tokio::test]
2225 async fn test_result_set_iterator_metadata_multiple_columns() {
2226 let mock_transport = MockTransport::new();
2227 let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
2228
2229 let data = ResultData {
2230 columns: vec![
2231 ColumnInfo {
2232 name: "id".to_string(),
2233 data_type: DataType {
2234 type_name: "DECIMAL".to_string(),
2235 precision: Some(18),
2236 scale: Some(0),
2237 size: None,
2238 character_set: None,
2239 with_local_time_zone: None,
2240 fraction: None,
2241 },
2242 },
2243 ColumnInfo {
2244 name: "name".to_string(),
2245 data_type: DataType {
2246 type_name: "VARCHAR".to_string(),
2247 precision: None,
2248 scale: None,
2249 size: Some(100),
2250 character_set: Some("UTF8".to_string()),
2251 with_local_time_zone: None,
2252 fraction: None,
2253 },
2254 },
2255 ColumnInfo {
2256 name: "active".to_string(),
2257 data_type: DataType {
2258 type_name: "BOOLEAN".to_string(),
2259 precision: None,
2260 scale: None,
2261 size: None,
2262 character_set: None,
2263 with_local_time_zone: None,
2264 fraction: None,
2265 },
2266 },
2267 ],
2268 data: vec![vec![
2269 serde_json::json!(1),
2270 serde_json::json!("Alice"),
2271 serde_json::json!(true),
2272 ]],
2273 total_rows: 1,
2274 };
2275
2276 let result = TransportQueryResult::ResultSet {
2277 handle: Some(ResultSetHandle::new(1)),
2278 data,
2279 };
2280
2281 let result_set = ResultSet::from_transport_result(result, transport).unwrap();
2282 let iterator = result_set.into_iterator().unwrap();
2283
2284 let metadata = iterator.metadata();
2285 assert_eq!(metadata.column_count, 3);
2286 assert_eq!(metadata.column_names(), vec!["id", "name", "active"]);
2287
2288 let types = metadata.column_types();
2289 assert_eq!(types.len(), 3);
2290 assert!(matches!(
2291 types[0],
2292 arrow::datatypes::DataType::Decimal128(18, 0)
2293 ));
2294 assert!(matches!(types[1], arrow::datatypes::DataType::Utf8));
2295 assert!(matches!(types[2], arrow::datatypes::DataType::Boolean));
2296 }
2297}