Skip to main content

exarrow_rs/query/
results.rs

1//! Result set handling and iteration.
2//!
3//! This module provides types for handling query results, including
4//! streaming result sets and metadata.
5
6use crate::error::{ConversionError, QueryError};
7use crate::transport::messages::{ColumnInfo, ResultData, ResultSetHandle};
8use crate::transport::protocol::QueryResult as TransportQueryResult;
9use crate::transport::TransportProtocol;
10use crate::types::TypeMapper;
11use arrow::array::RecordBatch;
12use arrow::datatypes::{Field, Schema};
13use std::sync::Arc;
14use tokio::sync::Mutex;
15
16/// Metadata about a query execution.
17#[derive(Debug, Clone)]
18pub struct QueryMetadata {
19    /// Schema of the result set
20    pub schema: Arc<Schema>,
21    /// Total number of rows (if known)
22    pub total_rows: Option<i64>,
23    /// Number of columns
24    pub column_count: usize,
25    /// Execution time in milliseconds (if available)
26    pub execution_time_ms: Option<u64>,
27}
28
29impl QueryMetadata {
30    /// Create metadata from schema and row count.
31    pub fn new(schema: Arc<Schema>, total_rows: Option<i64>) -> Self {
32        Self {
33            column_count: schema.fields().len(),
34            schema,
35            total_rows,
36            execution_time_ms: None,
37        }
38    }
39
40    /// Set execution time.
41    pub fn with_execution_time(mut self, execution_time_ms: u64) -> Self {
42        self.execution_time_ms = Some(execution_time_ms);
43        self
44    }
45
46    /// Get column names.
47    pub fn column_names(&self) -> Vec<&str> {
48        self.schema
49            .fields()
50            .iter()
51            .map(|f| f.name().as_str())
52            .collect()
53    }
54
55    /// Get column types.
56    pub fn column_types(&self) -> Vec<&arrow::datatypes::DataType> {
57        self.schema.fields().iter().map(|f| f.data_type()).collect()
58    }
59}
60
61/// Query result set that can be either a row count or streaming data.
62pub struct ResultSet {
63    /// Result type
64    inner: ResultSetInner,
65    /// Transport reference for fetching more data
66    transport: Arc<Mutex<dyn TransportProtocol>>,
67}
68
69impl std::fmt::Debug for ResultSet {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("ResultSet")
72            .field("inner", &self.inner)
73            .field("transport", &"<TransportProtocol>")
74            .finish()
75    }
76}
77
78#[derive(Debug)]
79enum ResultSetInner {
80    /// Row count result (INSERT, UPDATE, DELETE)
81    RowCount { count: i64 },
82    /// Streaming result set (SELECT)
83    Stream {
84        /// Result set handle for fetching
85        handle: Option<ResultSetHandle>,
86        /// Query metadata
87        metadata: QueryMetadata,
88        /// Buffered batches
89        batches: Vec<RecordBatch>,
90        /// Whether all data has been fetched
91        complete: bool,
92    },
93}
94
95impl ResultSet {
96    /// Create a result set from a transport query result.
97    pub(crate) fn from_transport_result(
98        result: TransportQueryResult,
99        transport: Arc<Mutex<dyn TransportProtocol>>,
100    ) -> Result<Self, QueryError> {
101        match result {
102            TransportQueryResult::RowCount { count } => Ok(Self {
103                inner: ResultSetInner::RowCount { count },
104                transport,
105            }),
106            TransportQueryResult::ResultSet { handle, data } => {
107                let schema = Self::build_schema(&data.columns)
108                    .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
109
110                let metadata = QueryMetadata::new(Arc::clone(&schema), Some(data.total_rows));
111
112                // Convert initial data to RecordBatch
113                // Note: data.data is in column-major format (each inner array is a column)
114                let batches = if !data.data.is_empty() {
115                    vec![Self::column_major_to_record_batch(&data, &schema)
116                        .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?]
117                } else {
118                    Vec::new()
119                };
120
121                // Result is complete if all data was in initial response OR no handle provided
122                // For column-major data, the number of rows is determined by the length of the first column
123                let num_rows_received = if data.data.is_empty() {
124                    0
125                } else {
126                    data.data[0].len() as i64
127                };
128                let complete = handle.is_none() || data.total_rows == num_rows_received;
129
130                Ok(Self {
131                    inner: ResultSetInner::Stream {
132                        handle, // Already Option<ResultSetHandle>
133                        metadata,
134                        batches,
135                        complete,
136                    },
137                    transport,
138                })
139            }
140        }
141    }
142
143    /// Get the row count if this is a row count result.
144    pub fn row_count(&self) -> Option<i64> {
145        match &self.inner {
146            ResultSetInner::RowCount { count } => Some(*count),
147            _ => None,
148        }
149    }
150
151    /// Get the metadata if this is a streaming result.
152    pub fn metadata(&self) -> Option<&QueryMetadata> {
153        match &self.inner {
154            ResultSetInner::Stream { metadata, .. } => Some(metadata),
155            _ => None,
156        }
157    }
158
159    /// Check if this is a streaming result set.
160    pub fn is_stream(&self) -> bool {
161        matches!(&self.inner, ResultSetInner::Stream { .. })
162    }
163
164    /// Convert to an iterator over RecordBatches.
165    ///
166    /// # Errors
167    /// Returns `QueryError::NoResultSet` if this is not a streaming result.
168    pub fn into_iterator(self) -> Result<ResultSetIterator, QueryError> {
169        match self.inner {
170            ResultSetInner::Stream {
171                handle,
172                metadata,
173                batches,
174                complete,
175            } => Ok(ResultSetIterator {
176                handle,
177                transport: self.transport,
178                metadata,
179                batches,
180                current_index: 0,
181                complete,
182            }),
183            ResultSetInner::RowCount { .. } => Err(QueryError::NoResultSet(
184                "Cannot iterate over row count result".to_string(),
185            )),
186        }
187    }
188
189    /// Fetch all remaining batches into memory.
190    ///
191    /// # Errors
192    /// Returns `QueryError` if fetching fails or if this is not a streaming result.
193    pub async fn fetch_all(mut self) -> Result<Vec<RecordBatch>, QueryError> {
194        match &mut self.inner {
195            ResultSetInner::Stream {
196                handle,
197                metadata,
198                batches,
199                complete,
200            } => {
201                let all_batches = if *complete {
202                    batches.clone()
203                } else {
204                    let mut all_batches = batches.clone();
205
206                    // Fetch remaining data
207                    if let Some(handle_val) = handle {
208                        loop {
209                            let mut transport = self.transport.lock().await;
210                            let result_data = transport
211                                .fetch_results(*handle_val)
212                                .await
213                                .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
214
215                            if result_data.data.is_empty() {
216                                *complete = true;
217                                break;
218                            }
219
220                            let batch =
221                                Self::column_major_to_record_batch(&result_data, &metadata.schema)
222                                    .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
223
224                            all_batches.push(batch);
225
226                            // Check if we've fetched everything
227                            if result_data.total_rows > 0
228                                && all_batches.iter().map(|b| b.num_rows()).sum::<usize>()
229                                    >= result_data.total_rows as usize
230                            {
231                                *complete = true;
232                                break;
233                            }
234                        }
235                    }
236
237                    *batches = all_batches.clone();
238                    all_batches
239                };
240
241                // Close the result set handle on the server to release resources
242                if let Some(handle_val) = handle.take() {
243                    let mut transport = self.transport.lock().await;
244                    // Ignore close errors - we've already fetched the data
245                    let _ = transport.close_result_set(handle_val).await;
246                }
247
248                Ok(all_batches)
249            }
250            ResultSetInner::RowCount { .. } => Err(QueryError::NoResultSet(
251                "Cannot fetch batches from row count result".to_string(),
252            )),
253        }
254    }
255
256    /// Build Arrow schema from column information.
257    fn build_schema(columns: &[ColumnInfo]) -> Result<Arc<Schema>, ConversionError> {
258        let fields: Result<Vec<Field>, ConversionError> = columns
259            .iter()
260            .map(|col| {
261                // Parse Exasol type from DataType
262                let arrow_type = Self::exasol_datatype_to_arrow(&col.data_type)?;
263
264                // All fields are nullable by default in Exasol
265                Ok(Field::new(&col.name, arrow_type, true))
266            })
267            .collect();
268
269        Ok(Arc::new(Schema::new(fields?)))
270    }
271
272    /// Convert Exasol DataType to Arrow DataType.
273    fn exasol_datatype_to_arrow(
274        data_type: &crate::transport::messages::DataType,
275    ) -> Result<arrow::datatypes::DataType, ConversionError> {
276        use crate::types::ExasolType;
277
278        let exasol_type = match data_type.type_name.as_str() {
279            "BOOLEAN" => ExasolType::Boolean,
280            "CHAR" => ExasolType::Char {
281                size: data_type.size.unwrap_or(1) as usize,
282            },
283            "VARCHAR" => ExasolType::Varchar {
284                size: data_type.size.unwrap_or(2000000) as usize,
285            },
286            "DECIMAL" => ExasolType::Decimal {
287                precision: data_type.precision.unwrap_or(18) as u8,
288                scale: data_type.scale.unwrap_or(0) as i8,
289            },
290            "DOUBLE" => ExasolType::Double,
291            "DATE" => ExasolType::Date,
292            "TIMESTAMP" => ExasolType::Timestamp {
293                with_local_time_zone: data_type.with_local_time_zone.unwrap_or(false),
294            },
295            "TIMESTAMP WITH LOCAL TIME ZONE" => ExasolType::Timestamp {
296                with_local_time_zone: true,
297            },
298            "INTERVAL YEAR TO MONTH" => ExasolType::IntervalYearToMonth,
299            "INTERVAL DAY TO SECOND" => ExasolType::IntervalDayToSecond {
300                precision: data_type.fraction.unwrap_or(3) as u8,
301            },
302            "GEOMETRY" => ExasolType::Geometry { srid: None },
303            "HASHTYPE" => ExasolType::Hashtype { byte_size: 16 },
304            _ => {
305                return Err(ConversionError::UnsupportedType {
306                    exasol_type: data_type.type_name.clone(),
307                })
308            }
309        };
310
311        TypeMapper::exasol_to_arrow(&exasol_type, true)
312    }
313
314    /// Convert row-major result data to RecordBatch.
315    ///
316    /// Data is expected in row-major format where `data[row_idx][col_idx]` contains
317    /// the value at that position.
318    ///
319    /// Example: For a result with 2 columns (id, name) and 3 rows:
320    fn column_major_to_record_batch(
321        data: &ResultData,
322        schema: &Arc<Schema>,
323    ) -> Result<RecordBatch, ConversionError> {
324        use arrow::array::*;
325        use serde_json::Value;
326
327        if data.data.is_empty() {
328            // Create empty batch with schema
329            let empty_arrays: Vec<Arc<dyn Array>> = schema
330                .fields()
331                .iter()
332                .map(|field| new_empty_array(field.data_type()))
333                .collect();
334
335            return RecordBatch::try_new(Arc::clone(schema), empty_arrays)
336                .map_err(|e| ConversionError::ArrowError(e.to_string()));
337        }
338
339        // Extract column values from row-major data
340        let num_columns = schema.fields().len();
341        let column_values: Vec<Vec<&Value>> = (0..num_columns)
342            .map(|col_idx| {
343                data.data
344                    .iter()
345                    .map(|row| row.get(col_idx).unwrap_or(&Value::Null))
346                    .collect()
347            })
348            .collect();
349
350        let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
351
352        for (col_idx, field) in schema.fields().iter().enumerate() {
353            use arrow::datatypes::DataType;
354
355            // Get the column data (all values for this column)
356            let col_values = &column_values[col_idx];
357
358            // Build array based on data type
359            let array: Arc<dyn Array> = match field.data_type() {
360                DataType::Boolean => {
361                    let mut builder = BooleanBuilder::new();
362                    for value in col_values {
363                        if value.is_null() {
364                            builder.append_null();
365                        } else if let Some(b) = value.as_bool() {
366                            builder.append_value(b);
367                        } else {
368                            builder.append_null();
369                        }
370                    }
371                    Arc::new(builder.finish())
372                }
373                DataType::Int32 => {
374                    let mut builder = Int32Builder::new();
375                    for value in col_values {
376                        if value.is_null() {
377                            builder.append_null();
378                        } else if let Some(i) = value.as_i64() {
379                            builder.append_value(i as i32);
380                        } else {
381                            builder.append_null();
382                        }
383                    }
384                    Arc::new(builder.finish())
385                }
386                DataType::Int64 => {
387                    let mut builder = Int64Builder::new();
388                    for value in col_values {
389                        if value.is_null() {
390                            builder.append_null();
391                        } else if let Some(i) = value.as_i64() {
392                            builder.append_value(i);
393                        } else {
394                            builder.append_null();
395                        }
396                    }
397                    Arc::new(builder.finish())
398                }
399                DataType::Float64 => {
400                    let mut builder = Float64Builder::new();
401                    for value in col_values {
402                        if value.is_null() {
403                            builder.append_null();
404                        } else if let Some(f) = value.as_f64() {
405                            builder.append_value(f);
406                        } else {
407                            builder.append_null();
408                        }
409                    }
410                    Arc::new(builder.finish())
411                }
412                DataType::Utf8 => {
413                    let mut builder = StringBuilder::new();
414                    for value in col_values {
415                        if value.is_null() {
416                            builder.append_null();
417                        } else if let Some(s) = value.as_str() {
418                            builder.append_value(s);
419                        } else {
420                            // Convert to string
421                            builder.append_value(value.to_string());
422                        }
423                    }
424                    Arc::new(builder.finish())
425                }
426                DataType::Decimal128(precision, scale) => {
427                    let mut builder = Decimal128Builder::new()
428                        .with_precision_and_scale(*precision, *scale)
429                        .map_err(|e| ConversionError::ArrowError(e.to_string()))?;
430
431                    for value in col_values {
432                        if value.is_null() {
433                            builder.append_null();
434                        } else if let Some(s) = value.as_str() {
435                            // Parse string decimal (most common format from Exasol)
436                            let scaled = Self::parse_string_to_decimal(s, *scale)?;
437                            builder.append_value(scaled);
438                        } else if let Some(i) = value.as_i64() {
439                            // Scale the integer value
440                            let scaled = i * 10i64.pow(*scale as u32);
441                            builder.append_value(scaled as i128);
442                        } else if let Some(f) = value.as_f64() {
443                            let scaled = (f * 10f64.powi(*scale as i32)) as i128;
444                            builder.append_value(scaled);
445                        } else {
446                            builder.append_null();
447                        }
448                    }
449                    Arc::new(builder.finish())
450                }
451                DataType::Date32 => {
452                    let mut builder = Date32Builder::new();
453                    for value in col_values {
454                        if value.is_null() {
455                            builder.append_null();
456                        } else if let Some(s) = value.as_str() {
457                            // Parse date string "YYYY-MM-DD" to days since Unix epoch
458                            match Self::parse_date_to_days(s) {
459                                Ok(days) => builder.append_value(days),
460                                Err(_) => builder.append_null(),
461                            }
462                        } else {
463                            builder.append_null();
464                        }
465                    }
466                    Arc::new(builder.finish())
467                }
468                DataType::Timestamp(_, tz) => {
469                    let mut builder = TimestampMicrosecondBuilder::new();
470                    for value in col_values {
471                        if value.is_null() {
472                            builder.append_null();
473                        } else if let Some(s) = value.as_str() {
474                            match Self::parse_timestamp_to_micros(s) {
475                                Ok(micros) => builder.append_value(micros),
476                                Err(_) => builder.append_null(),
477                            }
478                        } else {
479                            builder.append_null();
480                        }
481                    }
482                    let array = builder.finish();
483                    if tz.is_some() {
484                        Arc::new(array.with_timezone("UTC"))
485                    } else {
486                        Arc::new(array)
487                    }
488                }
489                _ => {
490                    // Fallback to string for unsupported types
491                    let mut builder = StringBuilder::new();
492                    for value in col_values {
493                        if value.is_null() {
494                            builder.append_null();
495                        } else {
496                            builder.append_value(value.to_string());
497                        }
498                    }
499                    Arc::new(builder.finish())
500                }
501            };
502
503            arrays.push(array);
504        }
505
506        RecordBatch::try_new(Arc::clone(schema), arrays)
507            .map_err(|e| ConversionError::ArrowError(e.to_string()))
508    }
509
510    /// Parse a date string "YYYY-MM-DD" to days since Unix epoch (1970-01-01).
511    fn parse_date_to_days(date_str: &str) -> Result<i32, ()> {
512        let parts: Vec<&str> = date_str.split('-').collect();
513        if parts.len() != 3 {
514            return Err(());
515        }
516
517        let year: i32 = parts[0].parse().map_err(|_| ())?;
518        let month: u32 = parts[1].parse().map_err(|_| ())?;
519        let day: u32 = parts[2].parse().map_err(|_| ())?;
520
521        if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
522            return Err(());
523        }
524
525        // Calculate days since Unix epoch
526        let days_from_year =
527            (year - 1970) * 365 + (year - 1969) / 4 - (year - 1901) / 100 + (year - 1601) / 400;
528        let days_from_month = match month {
529            1 => 0,
530            2 => 31,
531            3 => 59,
532            4 => 90,
533            5 => 120,
534            6 => 151,
535            7 => 181,
536            8 => 212,
537            9 => 243,
538            10 => 273,
539            11 => 304,
540            12 => 334,
541            _ => return Err(()),
542        };
543
544        // Add leap day if after February and leap year
545        let is_leap_year = (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0);
546        let leap_adjustment = if month > 2 && is_leap_year { 1 } else { 0 };
547
548        Ok(days_from_year + days_from_month + day as i32 - 1 + leap_adjustment)
549    }
550
551    /// Parse a timestamp string to microseconds since Unix epoch.
552    fn parse_timestamp_to_micros(timestamp_str: &str) -> Result<i64, ()> {
553        // Split date and time
554        let parts: Vec<&str> = timestamp_str.split(' ').collect();
555        if parts.is_empty() {
556            return Err(());
557        }
558
559        // Parse date part
560        let days = Self::parse_date_to_days(parts[0])?;
561        let mut micros = days as i64 * 86400 * 1_000_000;
562
563        // Parse time part if present
564        if parts.len() > 1 {
565            let time_parts: Vec<&str> = parts[1].split(':').collect();
566            if time_parts.len() >= 2 {
567                let hours: i64 = time_parts[0].parse().map_err(|_| ())?;
568                let minutes: i64 = time_parts[1].parse().map_err(|_| ())?;
569
570                micros += hours * 3600 * 1_000_000;
571                micros += minutes * 60 * 1_000_000;
572
573                if time_parts.len() >= 3 {
574                    // Parse seconds and microseconds
575                    let sec_parts: Vec<&str> = time_parts[2].split('.').collect();
576                    let seconds: i64 = sec_parts[0].parse().map_err(|_| ())?;
577
578                    micros += seconds * 1_000_000;
579
580                    if sec_parts.len() > 1 {
581                        // Parse fractional seconds (microseconds)
582                        let frac = sec_parts[1];
583                        let frac_micros = if frac.len() <= 6 {
584                            let padding = 6 - frac.len();
585                            let padded = format!("{}{}", frac, "0".repeat(padding));
586                            padded.parse::<i64>().unwrap_or(0)
587                        } else {
588                            frac[..6].parse::<i64>().unwrap_or(0)
589                        };
590                        micros += frac_micros;
591                    }
592                }
593            }
594        }
595
596        Ok(micros)
597    }
598
599    /// Parse a decimal string to i128 scaled value.
600    ///
601    /// Handles formats like "123", "123.45", "-123.45"
602    fn parse_string_to_decimal(s: &str, scale: i8) -> Result<i128, ConversionError> {
603        // Handle empty string
604        if s.is_empty() {
605            return Err(ConversionError::InvalidFormat(
606                "Empty decimal string".to_string(),
607            ));
608        }
609
610        // Split on decimal point
611        let parts: Vec<&str> = s.split('.').collect();
612
613        let (integer_part, decimal_part) = match parts.len() {
614            1 => (parts[0], ""),
615            2 => (parts[0], parts[1]),
616            _ => {
617                return Err(ConversionError::InvalidFormat(format!(
618                    "Invalid decimal format: {}",
619                    s
620                )));
621            }
622        };
623
624        // Parse the integer part
625        let mut result: i128 = integer_part.parse().map_err(|_| {
626            ConversionError::InvalidFormat(format!("Invalid integer part: {}", integer_part))
627        })?;
628
629        // Scale up by 10^scale
630        result = result
631            .checked_mul(10_i128.pow(scale as u32))
632            .ok_or_else(|| ConversionError::InvalidFormat("Decimal overflow".to_string()))?;
633
634        // Add the decimal part
635        if !decimal_part.is_empty() {
636            let decimal_digits = decimal_part.len().min(scale as usize);
637            let decimal_value: i128 = decimal_part[..decimal_digits].parse().map_err(|_| {
638                ConversionError::InvalidFormat(format!("Invalid decimal part: {}", decimal_part))
639            })?;
640
641            // Scale the decimal part appropriately
642            let scale_diff = scale as usize - decimal_digits;
643            let scaled_decimal = decimal_value * 10_i128.pow(scale_diff as u32);
644
645            result = result
646                .checked_add(if integer_part.starts_with('-') {
647                    -scaled_decimal
648                } else {
649                    scaled_decimal
650                })
651                .ok_or_else(|| ConversionError::InvalidFormat("Decimal overflow".to_string()))?;
652        }
653
654        Ok(result)
655    }
656}
657
658/// Iterator over RecordBatches from a result set.
659///
660/// Lazily fetches data from the transport as needed.
661pub struct ResultSetIterator {
662    /// Result set handle
663    handle: Option<ResultSetHandle>,
664    /// Transport reference
665    transport: Arc<Mutex<dyn TransportProtocol>>,
666    /// Query metadata
667    metadata: QueryMetadata,
668    /// Buffered batches
669    batches: Vec<RecordBatch>,
670    /// Current iteration index
671    current_index: usize,
672    /// Whether all data has been fetched
673    complete: bool,
674}
675
676impl ResultSetIterator {
677    /// Get the query metadata.
678    pub fn metadata(&self) -> &QueryMetadata {
679        &self.metadata
680    }
681
682    /// Fetch the next batch from the transport.
683    async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>, QueryError> {
684        if self.complete {
685            return Ok(None);
686        }
687
688        let handle = match self.handle {
689            Some(h) => h,
690            None => {
691                self.complete = true;
692                return Ok(None);
693            }
694        };
695
696        let mut transport = self.transport.lock().await;
697        let result_data = transport
698            .fetch_results(handle)
699            .await
700            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
701
702        if result_data.data.is_empty() {
703            self.complete = true;
704            return Ok(None);
705        }
706
707        let batch = ResultSet::column_major_to_record_batch(&result_data, &self.metadata.schema)
708            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
709
710        Ok(Some(batch))
711    }
712
713    /// Get the next batch synchronously (blocking).
714    ///
715    /// This is useful for implementing sync Iterator trait.
716    pub fn next_batch(&mut self) -> Option<Result<RecordBatch, QueryError>> {
717        // Return buffered batch if available
718        if self.current_index < self.batches.len() {
719            let batch = self.batches[self.current_index].clone();
720            self.current_index += 1;
721            return Some(Ok(batch));
722        }
723
724        // Need to fetch more data
725        if self.complete {
726            return None;
727        }
728
729        // Use block_on for async fetch (not ideal, but works for Phase 1)
730        let runtime = tokio::runtime::Handle::try_current();
731        if let Ok(handle) = runtime {
732            let result = handle.block_on(self.fetch_next_batch());
733            match result {
734                Ok(Some(batch)) => {
735                    self.batches.push(batch.clone());
736                    self.current_index += 1;
737                    Some(Ok(batch))
738                }
739                Ok(None) => None,
740                Err(e) => Some(Err(e)),
741            }
742        } else {
743            // No runtime available
744            Some(Err(QueryError::InvalidState(
745                "No async runtime available".to_string(),
746            )))
747        }
748    }
749
750    /// Close the result set and release resources.
751    pub async fn close(mut self) -> Result<(), QueryError> {
752        if let Some(handle) = self.handle.take() {
753            let mut transport = self.transport.lock().await;
754            transport
755                .close_result_set(handle)
756                .await
757                .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
758        }
759        Ok(())
760    }
761}
762
763// Implement Iterator for ResultSetIterator
764impl Iterator for ResultSetIterator {
765    type Item = Result<RecordBatch, QueryError>;
766
767    fn next(&mut self) -> Option<Self::Item> {
768        self.next_batch()
769    }
770}
771
772#[cfg(test)]
773mod tests {
774    use super::*;
775    use crate::transport::messages::{ColumnInfo, DataType};
776    use crate::transport::protocol::{
777        PreparedStatementHandle, QueryResult as TransportQueryResult,
778    };
779    use arrow::array::Array;
780    use async_trait::async_trait;
781    use mockall::mock;
782
783    // Mock transport for testing
784    mock! {
785        pub Transport {}
786
787        #[async_trait]
788        impl TransportProtocol for Transport {
789            async fn connect(&mut self, params: &crate::transport::protocol::ConnectionParams) -> Result<(), crate::error::TransportError>;
790            async fn authenticate(&mut self, credentials: &crate::transport::protocol::Credentials) -> Result<crate::transport::messages::SessionInfo, crate::error::TransportError>;
791            async fn execute_query(&mut self, sql: &str) -> Result<TransportQueryResult, crate::error::TransportError>;
792            async fn fetch_results(&mut self, handle: ResultSetHandle) -> Result<ResultData, crate::error::TransportError>;
793            async fn close_result_set(&mut self, handle: ResultSetHandle) -> Result<(), crate::error::TransportError>;
794            async fn create_prepared_statement(&mut self, sql: &str) -> Result<PreparedStatementHandle, crate::error::TransportError>;
795            async fn execute_prepared_statement(&mut self, handle: &PreparedStatementHandle, parameters: Option<Vec<Vec<serde_json::Value>>>) -> Result<TransportQueryResult, crate::error::TransportError>;
796            async fn close_prepared_statement(&mut self, handle: &PreparedStatementHandle) -> Result<(), crate::error::TransportError>;
797            async fn close(&mut self) -> Result<(), crate::error::TransportError>;
798            fn is_connected(&self) -> bool;
799            async fn set_autocommit(&mut self, enabled: bool) -> Result<(), crate::error::TransportError>;
800        }
801    }
802
803    #[tokio::test]
804    async fn test_result_set_row_count() {
805        let mock_transport = MockTransport::new();
806        let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
807
808        let result = TransportQueryResult::RowCount { count: 42 };
809        let result_set = ResultSet::from_transport_result(result, transport).unwrap();
810
811        assert_eq!(result_set.row_count(), Some(42));
812        assert!(!result_set.is_stream());
813        assert!(result_set.metadata().is_none());
814    }
815
816    #[tokio::test]
817    async fn test_result_set_stream() {
818        let mock_transport = MockTransport::new();
819        let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
820
821        // Row-major data format:
822        // Row 0: [1, "Alice"]
823        // Row 1: [2, "Bob"]
824        let data = ResultData {
825            columns: vec![
826                ColumnInfo {
827                    name: "id".to_string(),
828                    data_type: DataType {
829                        type_name: "DECIMAL".to_string(),
830                        precision: Some(18),
831                        scale: Some(0),
832                        size: None,
833                        character_set: None,
834                        with_local_time_zone: None,
835                        fraction: None,
836                    },
837                },
838                ColumnInfo {
839                    name: "name".to_string(),
840                    data_type: DataType {
841                        type_name: "VARCHAR".to_string(),
842                        precision: None,
843                        scale: None,
844                        size: Some(100),
845                        character_set: Some("UTF8".to_string()),
846                        with_local_time_zone: None,
847                        fraction: None,
848                    },
849                },
850            ],
851            data: vec![
852                vec![serde_json::json!(1), serde_json::json!("Alice")],
853                vec![serde_json::json!(2), serde_json::json!("Bob")],
854            ],
855            total_rows: 2,
856        };
857
858        let result = TransportQueryResult::ResultSet {
859            handle: Some(ResultSetHandle::new(1)),
860            data,
861        };
862
863        let result_set = ResultSet::from_transport_result(result, transport).unwrap();
864
865        assert!(result_set.row_count().is_none());
866        assert!(result_set.is_stream());
867
868        let metadata = result_set.metadata().unwrap();
869        assert_eq!(metadata.column_count, 2);
870        assert_eq!(metadata.total_rows, Some(2));
871        assert_eq!(metadata.column_names(), vec!["id", "name"]);
872    }
873
874    #[tokio::test]
875    async fn test_result_set_to_record_batch() {
876        let schema = Arc::new(Schema::new(vec![
877            Field::new("id", arrow::datatypes::DataType::Decimal128(18, 0), true),
878            Field::new("name", arrow::datatypes::DataType::Utf8, true),
879            Field::new("active", arrow::datatypes::DataType::Boolean, true),
880        ]));
881
882        // Row-major data format:
883        // Row 0: [1, "Alice", true]
884        // Row 1: [2, "Bob", false]
885        let data = ResultData {
886            columns: vec![],
887            data: vec![
888                vec![
889                    serde_json::json!(1),
890                    serde_json::json!("Alice"),
891                    serde_json::json!(true),
892                ],
893                vec![
894                    serde_json::json!(2),
895                    serde_json::json!("Bob"),
896                    serde_json::json!(false),
897                ],
898            ],
899            total_rows: 2,
900        };
901
902        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
903
904        assert_eq!(batch.num_rows(), 2);
905        assert_eq!(batch.num_columns(), 3);
906    }
907
908    #[tokio::test]
909    async fn test_single_row_single_column() {
910        // Test case for: SELECT 42 AS answer
911        let schema = Arc::new(Schema::new(vec![Field::new(
912            "answer",
913            arrow::datatypes::DataType::Decimal128(18, 0),
914            true,
915        )]));
916
917        // Row-major: one row with one value
918        let data = ResultData {
919            columns: vec![],
920            data: vec![
921                vec![serde_json::json!(42)], // Row 0: [42]
922            ],
923            total_rows: 1,
924        };
925
926        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
927
928        assert_eq!(batch.num_rows(), 1, "Should have exactly 1 row");
929        assert_eq!(batch.num_columns(), 1, "Should have exactly 1 column");
930    }
931
932    #[tokio::test]
933    async fn test_single_row_two_columns() {
934        // Test case for: SELECT 42 AS answer, 'hello' AS greeting
935        let schema = Arc::new(Schema::new(vec![
936            Field::new(
937                "answer",
938                arrow::datatypes::DataType::Decimal128(18, 0),
939                true,
940            ),
941            Field::new("greeting", arrow::datatypes::DataType::Utf8, true),
942        ]));
943
944        // Row-major: one row with two values
945        let data = ResultData {
946            columns: vec![],
947            data: vec![
948                vec![serde_json::json!(42), serde_json::json!("hello")], // Row 0: [42, "hello"]
949            ],
950            total_rows: 1,
951        };
952
953        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
954
955        assert_eq!(batch.num_rows(), 1, "Should have exactly 1 row");
956        assert_eq!(batch.num_columns(), 2, "Should have exactly 2 columns");
957    }
958
959    #[tokio::test]
960    async fn test_ten_rows_two_columns() {
961        // Test case for: SELECT LEVEL AS id, 'Row ' || LEVEL AS label FROM DUAL CONNECT BY LEVEL <= 10
962        let schema = Arc::new(Schema::new(vec![
963            Field::new("id", arrow::datatypes::DataType::Decimal128(18, 0), true),
964            Field::new("label", arrow::datatypes::DataType::Utf8, true),
965        ]));
966
967        // Row-major: ten rows, each with two values
968        let data = ResultData {
969            columns: vec![],
970            data: vec![
971                vec![serde_json::json!(1), serde_json::json!("Row 1")],
972                vec![serde_json::json!(2), serde_json::json!("Row 2")],
973                vec![serde_json::json!(3), serde_json::json!("Row 3")],
974                vec![serde_json::json!(4), serde_json::json!("Row 4")],
975                vec![serde_json::json!(5), serde_json::json!("Row 5")],
976                vec![serde_json::json!(6), serde_json::json!("Row 6")],
977                vec![serde_json::json!(7), serde_json::json!("Row 7")],
978                vec![serde_json::json!(8), serde_json::json!("Row 8")],
979                vec![serde_json::json!(9), serde_json::json!("Row 9")],
980                vec![serde_json::json!(10), serde_json::json!("Row 10")],
981            ],
982            total_rows: 10,
983        };
984
985        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
986
987        assert_eq!(batch.num_rows(), 10, "Should have exactly 10 rows");
988        assert_eq!(batch.num_columns(), 2, "Should have exactly 2 columns");
989    }
990
991    #[tokio::test]
992    async fn test_schema_building() {
993        let columns = vec![
994            ColumnInfo {
995                name: "id".to_string(),
996                data_type: DataType {
997                    type_name: "DECIMAL".to_string(),
998                    precision: Some(18),
999                    scale: Some(0),
1000                    size: None,
1001                    character_set: None,
1002                    with_local_time_zone: None,
1003                    fraction: None,
1004                },
1005            },
1006            ColumnInfo {
1007                name: "name".to_string(),
1008                data_type: DataType {
1009                    type_name: "VARCHAR".to_string(),
1010                    precision: None,
1011                    scale: None,
1012                    size: Some(100),
1013                    character_set: Some("UTF8".to_string()),
1014                    with_local_time_zone: None,
1015                    fraction: None,
1016                },
1017            },
1018            ColumnInfo {
1019                name: "created_at".to_string(),
1020                data_type: DataType {
1021                    type_name: "TIMESTAMP".to_string(),
1022                    precision: None,
1023                    scale: None,
1024                    size: None,
1025                    character_set: None,
1026                    with_local_time_zone: Some(false),
1027                    fraction: None,
1028                },
1029            },
1030        ];
1031
1032        let schema = ResultSet::build_schema(&columns).unwrap();
1033
1034        assert_eq!(schema.fields().len(), 3);
1035        assert_eq!(schema.field(0).name(), "id");
1036        assert_eq!(schema.field(1).name(), "name");
1037        assert_eq!(schema.field(2).name(), "created_at");
1038    }
1039
1040    #[test]
1041    fn test_query_metadata() {
1042        let schema = Arc::new(Schema::new(vec![
1043            Field::new("id", arrow::datatypes::DataType::Int64, false),
1044            Field::new("name", arrow::datatypes::DataType::Utf8, true),
1045        ]));
1046
1047        let metadata = QueryMetadata::new(Arc::clone(&schema), Some(100)).with_execution_time(250);
1048
1049        assert_eq!(metadata.column_count, 2);
1050        assert_eq!(metadata.total_rows, Some(100));
1051        assert_eq!(metadata.execution_time_ms, Some(250));
1052        assert_eq!(metadata.column_names(), vec!["id", "name"]);
1053    }
1054
1055    // =========================================================================
1056    // Tests for QueryMetadata::column_types
1057    // =========================================================================
1058
1059    #[test]
1060    fn test_query_metadata_column_types_returns_correct_types() {
1061        let schema = Arc::new(Schema::new(vec![
1062            Field::new("id", arrow::datatypes::DataType::Int64, false),
1063            Field::new("name", arrow::datatypes::DataType::Utf8, true),
1064            Field::new("active", arrow::datatypes::DataType::Boolean, true),
1065        ]));
1066
1067        let metadata = QueryMetadata::new(Arc::clone(&schema), Some(100));
1068        let types = metadata.column_types();
1069
1070        assert_eq!(types.len(), 3);
1071        assert_eq!(types[0], &arrow::datatypes::DataType::Int64);
1072        assert_eq!(types[1], &arrow::datatypes::DataType::Utf8);
1073        assert_eq!(types[2], &arrow::datatypes::DataType::Boolean);
1074    }
1075
1076    #[test]
1077    fn test_query_metadata_column_types_empty_schema() {
1078        let fields: Vec<Field> = vec![];
1079        let schema = Arc::new(Schema::new(fields));
1080
1081        let metadata = QueryMetadata::new(Arc::clone(&schema), None);
1082        let types = metadata.column_types();
1083
1084        assert!(types.is_empty());
1085    }
1086
1087    // =========================================================================
1088    // Tests for ResultSet::parse_date_to_days
1089    // =========================================================================
1090
1091    #[test]
1092    fn test_parse_date_to_days_unix_epoch() {
1093        let result = ResultSet::parse_date_to_days("1970-01-01").unwrap();
1094        assert_eq!(result, 0);
1095    }
1096
1097    #[test]
1098    fn test_parse_date_to_days_after_epoch() {
1099        let result = ResultSet::parse_date_to_days("1970-01-02").unwrap();
1100        assert_eq!(result, 1);
1101    }
1102
1103    #[test]
1104    fn test_parse_date_to_days_year_2000() {
1105        // 2000-01-01 is 10957 days after Unix epoch
1106        let result = ResultSet::parse_date_to_days("2000-01-01").unwrap();
1107        assert_eq!(result, 10957);
1108    }
1109
1110    #[test]
1111    fn test_parse_date_to_days_leap_year() {
1112        // 2000-03-01 should include Feb 29 (leap year)
1113        let result = ResultSet::parse_date_to_days("2000-03-01").unwrap();
1114        // 2000-01-01 = 10957, Jan has 31 days, Feb has 29 days (leap year)
1115        // 10957 + 31 + 29 = 11017
1116        assert_eq!(result, 11017);
1117    }
1118
1119    #[test]
1120    fn test_parse_date_to_days_non_leap_year() {
1121        // 2001-03-01 should NOT include Feb 29 (non-leap year)
1122        let result = ResultSet::parse_date_to_days("2001-03-01").unwrap();
1123        // 2001-01-01 = 11323, Jan has 31 days, Feb has 28 days
1124        // 11323 + 31 + 28 = 11382
1125        assert_eq!(result, 11382);
1126    }
1127
1128    #[test]
1129    fn test_parse_date_to_days_before_epoch() {
1130        // 1969-12-31 is -1 day before Unix epoch
1131        let result = ResultSet::parse_date_to_days("1969-12-31").unwrap();
1132        assert_eq!(result, -1);
1133    }
1134
1135    #[test]
1136    fn test_parse_date_to_days_invalid_format_wrong_separator() {
1137        let result = ResultSet::parse_date_to_days("2000/01/01");
1138        assert!(result.is_err());
1139    }
1140
1141    #[test]
1142    fn test_parse_date_to_days_invalid_format_missing_parts() {
1143        let result = ResultSet::parse_date_to_days("2000-01");
1144        assert!(result.is_err());
1145    }
1146
1147    #[test]
1148    fn test_parse_date_to_days_invalid_month_zero() {
1149        let result = ResultSet::parse_date_to_days("2000-00-01");
1150        assert!(result.is_err());
1151    }
1152
1153    #[test]
1154    fn test_parse_date_to_days_invalid_month_thirteen() {
1155        let result = ResultSet::parse_date_to_days("2000-13-01");
1156        assert!(result.is_err());
1157    }
1158
1159    #[test]
1160    fn test_parse_date_to_days_invalid_day_zero() {
1161        let result = ResultSet::parse_date_to_days("2000-01-00");
1162        assert!(result.is_err());
1163    }
1164
1165    #[test]
1166    fn test_parse_date_to_days_invalid_day_thirty_two() {
1167        let result = ResultSet::parse_date_to_days("2000-01-32");
1168        assert!(result.is_err());
1169    }
1170
1171    #[test]
1172    fn test_parse_date_to_days_invalid_non_numeric() {
1173        let result = ResultSet::parse_date_to_days("YYYY-MM-DD");
1174        assert!(result.is_err());
1175    }
1176
1177    // =========================================================================
1178    // Tests for ResultSet::parse_timestamp_to_micros
1179    // =========================================================================
1180
1181    #[test]
1182    fn test_parse_timestamp_to_micros_date_only() {
1183        let result = ResultSet::parse_timestamp_to_micros("1970-01-01").unwrap();
1184        assert_eq!(result, 0);
1185    }
1186
1187    #[test]
1188    fn test_parse_timestamp_to_micros_with_time() {
1189        // 1970-01-01 01:00:00 = 1 hour = 3600 * 1_000_000 microseconds
1190        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 01:00:00").unwrap();
1191        assert_eq!(result, 3600 * 1_000_000);
1192    }
1193
1194    #[test]
1195    fn test_parse_timestamp_to_micros_with_minutes() {
1196        // 1970-01-01 00:30:00 = 30 minutes = 30 * 60 * 1_000_000 microseconds
1197        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:30:00").unwrap();
1198        assert_eq!(result, 30 * 60 * 1_000_000);
1199    }
1200
1201    #[test]
1202    fn test_parse_timestamp_to_micros_with_seconds() {
1203        // 1970-01-01 00:00:45 = 45 seconds = 45 * 1_000_000 microseconds
1204        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:45").unwrap();
1205        assert_eq!(result, 45 * 1_000_000);
1206    }
1207
1208    #[test]
1209    fn test_parse_timestamp_to_micros_with_fractional_seconds_3_digits() {
1210        // 1970-01-01 00:00:00.123 = 123000 microseconds
1211        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.123").unwrap();
1212        assert_eq!(result, 123000);
1213    }
1214
1215    #[test]
1216    fn test_parse_timestamp_to_micros_with_fractional_seconds_6_digits() {
1217        // 1970-01-01 00:00:00.123456 = 123456 microseconds
1218        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.123456").unwrap();
1219        assert_eq!(result, 123456);
1220    }
1221
1222    #[test]
1223    fn test_parse_timestamp_to_micros_with_fractional_seconds_more_than_6_digits() {
1224        // 1970-01-01 00:00:00.1234567 should truncate to 123456 microseconds
1225        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.1234567").unwrap();
1226        assert_eq!(result, 123456);
1227    }
1228
1229    #[test]
1230    fn test_parse_timestamp_to_micros_with_fractional_seconds_1_digit() {
1231        // 1970-01-01 00:00:00.1 = 100000 microseconds
1232        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.1").unwrap();
1233        assert_eq!(result, 100000);
1234    }
1235
1236    #[test]
1237    fn test_parse_timestamp_to_micros_complex_timestamp() {
1238        // 2000-06-15 12:30:45.500
1239        // days = 11124 (from previous calculation for 2000-06-15)
1240        // time = 12*3600 + 30*60 + 45 seconds = 45045 seconds
1241        // micros from time = 45045 * 1_000_000 + 500000
1242        let result = ResultSet::parse_timestamp_to_micros("2000-06-15 12:30:45.500").unwrap();
1243
1244        // Calculate expected value
1245        let days_micros: i64 =
1246            ResultSet::parse_date_to_days("2000-06-15").unwrap() as i64 * 86400 * 1_000_000;
1247        let time_micros: i64 = (12 * 3600 + 30 * 60 + 45) * 1_000_000 + 500000;
1248        assert_eq!(result, days_micros + time_micros);
1249    }
1250
1251    #[test]
1252    fn test_parse_timestamp_to_micros_empty_string() {
1253        let result = ResultSet::parse_timestamp_to_micros("");
1254        assert!(result.is_err());
1255    }
1256
1257    #[test]
1258    fn test_parse_timestamp_to_micros_hours_and_minutes_only() {
1259        // 1970-01-01 01:30 (without seconds)
1260        // hours + minutes = 1*3600 + 30*60 = 5400 seconds = 5400 * 1_000_000 microseconds
1261        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 01:30").unwrap();
1262        assert_eq!(result, 5400 * 1_000_000);
1263    }
1264
1265    // =========================================================================
1266    // Tests for ResultSet::parse_string_to_decimal
1267    // =========================================================================
1268
1269    #[test]
1270    fn test_parse_string_to_decimal_integer() {
1271        let result = ResultSet::parse_string_to_decimal("123", 2).unwrap();
1272        // 123 with scale 2 = 12300
1273        assert_eq!(result, 12300);
1274    }
1275
1276    #[test]
1277    fn test_parse_string_to_decimal_with_decimal_point() {
1278        let result = ResultSet::parse_string_to_decimal("123.45", 2).unwrap();
1279        // 123.45 with scale 2 = 12345
1280        assert_eq!(result, 12345);
1281    }
1282
1283    #[test]
1284    fn test_parse_string_to_decimal_negative() {
1285        let result = ResultSet::parse_string_to_decimal("-123.45", 2).unwrap();
1286        // -123.45 with scale 2 = -12345
1287        assert_eq!(result, -12345);
1288    }
1289
1290    #[test]
1291    fn test_parse_string_to_decimal_zero_scale() {
1292        let result = ResultSet::parse_string_to_decimal("123", 0).unwrap();
1293        assert_eq!(result, 123);
1294    }
1295
1296    #[test]
1297    fn test_parse_string_to_decimal_high_scale() {
1298        let result = ResultSet::parse_string_to_decimal("1.5", 6).unwrap();
1299        // 1.5 with scale 6 = 1500000
1300        assert_eq!(result, 1500000);
1301    }
1302
1303    #[test]
1304    fn test_parse_string_to_decimal_truncates_extra_decimals() {
1305        // When decimal part has more digits than scale
1306        let result = ResultSet::parse_string_to_decimal("1.123456", 3).unwrap();
1307        // 1.123 with scale 3 = 1123 (truncates extra digits)
1308        assert_eq!(result, 1123);
1309    }
1310
1311    #[test]
1312    fn test_parse_string_to_decimal_zero() {
1313        let result = ResultSet::parse_string_to_decimal("0", 2).unwrap();
1314        assert_eq!(result, 0);
1315    }
1316
1317    #[test]
1318    fn test_parse_string_to_decimal_negative_zero() {
1319        let result = ResultSet::parse_string_to_decimal("-0", 2).unwrap();
1320        assert_eq!(result, 0);
1321    }
1322
1323    #[test]
1324    fn test_parse_string_to_decimal_empty_string() {
1325        let result = ResultSet::parse_string_to_decimal("", 2);
1326        assert!(result.is_err());
1327    }
1328
1329    #[test]
1330    fn test_parse_string_to_decimal_multiple_decimal_points() {
1331        let result = ResultSet::parse_string_to_decimal("1.2.3", 2);
1332        assert!(result.is_err());
1333    }
1334
1335    #[test]
1336    fn test_parse_string_to_decimal_non_numeric() {
1337        let result = ResultSet::parse_string_to_decimal("abc", 2);
1338        assert!(result.is_err());
1339    }
1340
1341    #[test]
1342    fn test_parse_string_to_decimal_large_value() {
1343        let result = ResultSet::parse_string_to_decimal("999999999999999999", 0).unwrap();
1344        assert_eq!(result, 999999999999999999_i128);
1345    }
1346
1347    // =========================================================================
1348    // Tests for ResultSet::exasol_datatype_to_arrow
1349    // =========================================================================
1350
1351    #[test]
1352    fn test_exasol_datatype_to_arrow_boolean() {
1353        let data_type = DataType {
1354            type_name: "BOOLEAN".to_string(),
1355            precision: None,
1356            scale: None,
1357            size: None,
1358            character_set: None,
1359            with_local_time_zone: None,
1360            fraction: None,
1361        };
1362
1363        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1364        assert_eq!(result, arrow::datatypes::DataType::Boolean);
1365    }
1366
1367    #[test]
1368    fn test_exasol_datatype_to_arrow_char() {
1369        let data_type = DataType {
1370            type_name: "CHAR".to_string(),
1371            precision: None,
1372            scale: None,
1373            size: Some(10),
1374            character_set: None,
1375            with_local_time_zone: None,
1376            fraction: None,
1377        };
1378
1379        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1380        assert_eq!(result, arrow::datatypes::DataType::Utf8);
1381    }
1382
1383    #[test]
1384    fn test_exasol_datatype_to_arrow_char_default_size() {
1385        let data_type = DataType {
1386            type_name: "CHAR".to_string(),
1387            precision: None,
1388            scale: None,
1389            size: None, // Should default to 1
1390            character_set: None,
1391            with_local_time_zone: None,
1392            fraction: None,
1393        };
1394
1395        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1396        assert_eq!(result, arrow::datatypes::DataType::Utf8);
1397    }
1398
1399    #[test]
1400    fn test_exasol_datatype_to_arrow_varchar() {
1401        let data_type = DataType {
1402            type_name: "VARCHAR".to_string(),
1403            precision: None,
1404            scale: None,
1405            size: Some(100),
1406            character_set: None,
1407            with_local_time_zone: None,
1408            fraction: None,
1409        };
1410
1411        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1412        assert_eq!(result, arrow::datatypes::DataType::Utf8);
1413    }
1414
1415    #[test]
1416    fn test_exasol_datatype_to_arrow_varchar_default_size() {
1417        let data_type = DataType {
1418            type_name: "VARCHAR".to_string(),
1419            precision: None,
1420            scale: None,
1421            size: None, // Should default to 2000000
1422            character_set: None,
1423            with_local_time_zone: None,
1424            fraction: None,
1425        };
1426
1427        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1428        assert_eq!(result, arrow::datatypes::DataType::Utf8);
1429    }
1430
1431    #[test]
1432    fn test_exasol_datatype_to_arrow_decimal() {
1433        let data_type = DataType {
1434            type_name: "DECIMAL".to_string(),
1435            precision: Some(18),
1436            scale: Some(2),
1437            size: None,
1438            character_set: None,
1439            with_local_time_zone: None,
1440            fraction: None,
1441        };
1442
1443        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1444        assert_eq!(result, arrow::datatypes::DataType::Decimal128(18, 2));
1445    }
1446
1447    #[test]
1448    fn test_exasol_datatype_to_arrow_decimal_default_precision_scale() {
1449        let data_type = DataType {
1450            type_name: "DECIMAL".to_string(),
1451            precision: None, // Should default to 18
1452            scale: None,     // Should default to 0
1453            size: None,
1454            character_set: None,
1455            with_local_time_zone: None,
1456            fraction: None,
1457        };
1458
1459        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1460        assert_eq!(result, arrow::datatypes::DataType::Decimal128(18, 0));
1461    }
1462
1463    #[test]
1464    fn test_exasol_datatype_to_arrow_double() {
1465        let data_type = DataType {
1466            type_name: "DOUBLE".to_string(),
1467            precision: None,
1468            scale: None,
1469            size: None,
1470            character_set: None,
1471            with_local_time_zone: None,
1472            fraction: None,
1473        };
1474
1475        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1476        assert_eq!(result, arrow::datatypes::DataType::Float64);
1477    }
1478
1479    #[test]
1480    fn test_exasol_datatype_to_arrow_date() {
1481        let data_type = DataType {
1482            type_name: "DATE".to_string(),
1483            precision: None,
1484            scale: None,
1485            size: None,
1486            character_set: None,
1487            with_local_time_zone: None,
1488            fraction: None,
1489        };
1490
1491        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1492        assert_eq!(result, arrow::datatypes::DataType::Date32);
1493    }
1494
1495    #[test]
1496    fn test_exasol_datatype_to_arrow_timestamp_without_tz() {
1497        let data_type = DataType {
1498            type_name: "TIMESTAMP".to_string(),
1499            precision: None,
1500            scale: None,
1501            size: None,
1502            character_set: None,
1503            with_local_time_zone: Some(false),
1504            fraction: None,
1505        };
1506
1507        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1508        assert!(matches!(
1509            result,
1510            arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None)
1511        ));
1512    }
1513
1514    #[test]
1515    fn test_exasol_datatype_to_arrow_timestamp_with_tz() {
1516        let data_type = DataType {
1517            type_name: "TIMESTAMP".to_string(),
1518            precision: None,
1519            scale: None,
1520            size: None,
1521            character_set: None,
1522            with_local_time_zone: Some(true),
1523            fraction: None,
1524        };
1525
1526        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1527        assert!(matches!(
1528            result,
1529            arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, Some(_))
1530        ));
1531    }
1532
1533    #[test]
1534    fn test_exasol_datatype_to_arrow_timestamp_default_tz() {
1535        let data_type = DataType {
1536            type_name: "TIMESTAMP".to_string(),
1537            precision: None,
1538            scale: None,
1539            size: None,
1540            character_set: None,
1541            with_local_time_zone: None, // Should default to false
1542            fraction: None,
1543        };
1544
1545        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1546        assert!(matches!(
1547            result,
1548            arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None)
1549        ));
1550    }
1551
1552    #[test]
1553    fn test_exasol_datatype_to_arrow_interval_year_to_month() {
1554        let data_type = DataType {
1555            type_name: "INTERVAL YEAR TO MONTH".to_string(),
1556            precision: None,
1557            scale: None,
1558            size: None,
1559            character_set: None,
1560            with_local_time_zone: None,
1561            fraction: None,
1562        };
1563
1564        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1565        assert!(matches!(
1566            result,
1567            arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1568        ));
1569    }
1570
1571    #[test]
1572    fn test_exasol_datatype_to_arrow_interval_day_to_second() {
1573        let data_type = DataType {
1574            type_name: "INTERVAL DAY TO SECOND".to_string(),
1575            precision: None,
1576            scale: None,
1577            size: None,
1578            character_set: None,
1579            with_local_time_zone: None,
1580            fraction: Some(6),
1581        };
1582
1583        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1584        assert!(matches!(
1585            result,
1586            arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1587        ));
1588    }
1589
1590    #[test]
1591    fn test_exasol_datatype_to_arrow_interval_day_to_second_default_fraction() {
1592        let data_type = DataType {
1593            type_name: "INTERVAL DAY TO SECOND".to_string(),
1594            precision: None,
1595            scale: None,
1596            size: None,
1597            character_set: None,
1598            with_local_time_zone: None,
1599            fraction: None, // Should default to 3
1600        };
1601
1602        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1603        assert!(matches!(
1604            result,
1605            arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1606        ));
1607    }
1608
1609    #[test]
1610    fn test_exasol_datatype_to_arrow_geometry() {
1611        let data_type = DataType {
1612            type_name: "GEOMETRY".to_string(),
1613            precision: None,
1614            scale: None,
1615            size: None,
1616            character_set: None,
1617            with_local_time_zone: None,
1618            fraction: None,
1619        };
1620
1621        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1622        assert_eq!(result, arrow::datatypes::DataType::Binary);
1623    }
1624
1625    #[test]
1626    fn test_exasol_datatype_to_arrow_hashtype() {
1627        let data_type = DataType {
1628            type_name: "HASHTYPE".to_string(),
1629            precision: None,
1630            scale: None,
1631            size: None,
1632            character_set: None,
1633            with_local_time_zone: None,
1634            fraction: None,
1635        };
1636
1637        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1638        assert_eq!(result, arrow::datatypes::DataType::Binary);
1639    }
1640
1641    #[test]
1642    fn test_exasol_datatype_to_arrow_unsupported_type() {
1643        let data_type = DataType {
1644            type_name: "UNKNOWN_TYPE".to_string(),
1645            precision: None,
1646            scale: None,
1647            size: None,
1648            character_set: None,
1649            with_local_time_zone: None,
1650            fraction: None,
1651        };
1652
1653        let result = ResultSet::exasol_datatype_to_arrow(&data_type);
1654        assert!(result.is_err());
1655    }
1656
1657    // =========================================================================
1658    // Tests for ResultSet::column_major_to_record_batch with various types
1659    // =========================================================================
1660
1661    #[tokio::test]
1662    async fn test_column_major_to_record_batch_with_int32() {
1663        let schema = Arc::new(Schema::new(vec![Field::new(
1664            "value",
1665            arrow::datatypes::DataType::Int32,
1666            true,
1667        )]));
1668
1669        let data = ResultData {
1670            columns: vec![],
1671            data: vec![
1672                vec![serde_json::json!(1)],
1673                vec![serde_json::json!(2)],
1674                vec![serde_json::json!(null)],
1675                vec![serde_json::json!(4)],
1676            ],
1677            total_rows: 4,
1678        };
1679
1680        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1681
1682        assert_eq!(batch.num_rows(), 4);
1683        assert_eq!(batch.num_columns(), 1);
1684
1685        let array = batch
1686            .column(0)
1687            .as_any()
1688            .downcast_ref::<arrow::array::Int32Array>()
1689            .unwrap();
1690        assert_eq!(array.value(0), 1);
1691        assert_eq!(array.value(1), 2);
1692        assert!(array.is_null(2));
1693        assert_eq!(array.value(3), 4);
1694    }
1695
1696    #[tokio::test]
1697    async fn test_column_major_to_record_batch_with_int64() {
1698        let schema = Arc::new(Schema::new(vec![Field::new(
1699            "value",
1700            arrow::datatypes::DataType::Int64,
1701            true,
1702        )]));
1703
1704        let data = ResultData {
1705            columns: vec![],
1706            data: vec![
1707                vec![serde_json::json!(9223372036854775807_i64)], // Max i64
1708                vec![serde_json::json!(-9223372036854775808_i64)], // Min i64
1709                vec![serde_json::json!(null)],
1710            ],
1711            total_rows: 3,
1712        };
1713
1714        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1715
1716        assert_eq!(batch.num_rows(), 3);
1717
1718        let array = batch
1719            .column(0)
1720            .as_any()
1721            .downcast_ref::<arrow::array::Int64Array>()
1722            .unwrap();
1723        assert_eq!(array.value(0), 9223372036854775807_i64);
1724        assert_eq!(array.value(1), -9223372036854775808_i64);
1725        assert!(array.is_null(2));
1726    }
1727
1728    #[tokio::test]
1729    async fn test_column_major_to_record_batch_with_float64() {
1730        let schema = Arc::new(Schema::new(vec![Field::new(
1731            "value",
1732            arrow::datatypes::DataType::Float64,
1733            true,
1734        )]));
1735
1736        let data = ResultData {
1737            columns: vec![],
1738            data: vec![
1739                vec![serde_json::json!(1.23456)],
1740                vec![serde_json::json!(-9.87654)],
1741                vec![serde_json::json!(null)],
1742                vec![serde_json::json!(0.0)],
1743            ],
1744            total_rows: 4,
1745        };
1746
1747        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1748
1749        assert_eq!(batch.num_rows(), 4);
1750
1751        let array = batch
1752            .column(0)
1753            .as_any()
1754            .downcast_ref::<arrow::array::Float64Array>()
1755            .unwrap();
1756        assert!((array.value(0) - 1.23456).abs() < 0.00001);
1757        assert!((array.value(1) - (-9.87654)).abs() < 0.00001);
1758        assert!(array.is_null(2));
1759        assert!((array.value(3) - 0.0).abs() < 0.00001);
1760    }
1761
1762    #[tokio::test]
1763    async fn test_column_major_to_record_batch_with_boolean() {
1764        let schema = Arc::new(Schema::new(vec![Field::new(
1765            "flag",
1766            arrow::datatypes::DataType::Boolean,
1767            true,
1768        )]));
1769
1770        let data = ResultData {
1771            columns: vec![],
1772            data: vec![
1773                vec![serde_json::json!(true)],
1774                vec![serde_json::json!(false)],
1775                vec![serde_json::json!(null)],
1776            ],
1777            total_rows: 3,
1778        };
1779
1780        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1781
1782        assert_eq!(batch.num_rows(), 3);
1783
1784        let array = batch
1785            .column(0)
1786            .as_any()
1787            .downcast_ref::<arrow::array::BooleanArray>()
1788            .unwrap();
1789        assert!(array.value(0));
1790        assert!(!array.value(1));
1791        assert!(array.is_null(2));
1792    }
1793
1794    #[tokio::test]
1795    async fn test_column_major_to_record_batch_with_date32() {
1796        let schema = Arc::new(Schema::new(vec![Field::new(
1797            "date",
1798            arrow::datatypes::DataType::Date32,
1799            true,
1800        )]));
1801
1802        let data = ResultData {
1803            columns: vec![],
1804            data: vec![
1805                vec![serde_json::json!("1970-01-01")],
1806                vec![serde_json::json!("2000-01-01")],
1807                vec![serde_json::json!(null)],
1808            ],
1809            total_rows: 3,
1810        };
1811
1812        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1813
1814        assert_eq!(batch.num_rows(), 3);
1815
1816        let array = batch
1817            .column(0)
1818            .as_any()
1819            .downcast_ref::<arrow::array::Date32Array>()
1820            .unwrap();
1821        assert_eq!(array.value(0), 0); // Unix epoch
1822        assert_eq!(array.value(1), 10957); // 2000-01-01
1823        assert!(array.is_null(2));
1824    }
1825
1826    #[tokio::test]
1827    async fn test_column_major_to_record_batch_with_timestamp() {
1828        let schema = Arc::new(Schema::new(vec![Field::new(
1829            "timestamp",
1830            arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1831            true,
1832        )]));
1833
1834        let data = ResultData {
1835            columns: vec![],
1836            data: vec![
1837                vec![serde_json::json!("1970-01-01 00:00:00")],
1838                vec![serde_json::json!("1970-01-01 01:00:00.123456")],
1839                vec![serde_json::json!(null)],
1840            ],
1841            total_rows: 3,
1842        };
1843
1844        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1845
1846        assert_eq!(batch.num_rows(), 3);
1847
1848        let array = batch
1849            .column(0)
1850            .as_any()
1851            .downcast_ref::<arrow::array::TimestampMicrosecondArray>()
1852            .unwrap();
1853        assert_eq!(array.value(0), 0);
1854        // 1 hour + 123456 microseconds
1855        assert_eq!(array.value(1), 3600 * 1_000_000 + 123456);
1856        assert!(array.is_null(2));
1857    }
1858
1859    #[tokio::test]
1860    async fn test_column_major_to_record_batch_empty_data() {
1861        let schema = Arc::new(Schema::new(vec![
1862            Field::new("id", arrow::datatypes::DataType::Int64, true),
1863            Field::new("name", arrow::datatypes::DataType::Utf8, true),
1864        ]));
1865
1866        let data = ResultData {
1867            columns: vec![],
1868            data: vec![],
1869            total_rows: 0,
1870        };
1871
1872        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1873
1874        assert_eq!(batch.num_rows(), 0);
1875        assert_eq!(batch.num_columns(), 2);
1876    }
1877
1878    #[tokio::test]
1879    async fn test_column_major_to_record_batch_with_utf8_string() {
1880        let schema = Arc::new(Schema::new(vec![Field::new(
1881            "text",
1882            arrow::datatypes::DataType::Utf8,
1883            true,
1884        )]));
1885
1886        let data = ResultData {
1887            columns: vec![],
1888            data: vec![
1889                vec![serde_json::json!("hello")],
1890                vec![serde_json::json!("world")],
1891                vec![serde_json::json!(null)],
1892                vec![serde_json::json!("")], // Empty string
1893            ],
1894            total_rows: 4,
1895        };
1896
1897        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1898
1899        assert_eq!(batch.num_rows(), 4);
1900
1901        let array = batch
1902            .column(0)
1903            .as_any()
1904            .downcast_ref::<arrow::array::StringArray>()
1905            .unwrap();
1906        assert_eq!(array.value(0), "hello");
1907        assert_eq!(array.value(1), "world");
1908        assert!(array.is_null(2));
1909        assert_eq!(array.value(3), "");
1910    }
1911
1912    #[tokio::test]
1913    async fn test_column_major_to_record_batch_utf8_from_non_string_value() {
1914        // Test that non-string JSON values are converted to string
1915        let schema = Arc::new(Schema::new(vec![Field::new(
1916            "text",
1917            arrow::datatypes::DataType::Utf8,
1918            true,
1919        )]));
1920
1921        let data = ResultData {
1922            columns: vec![],
1923            data: vec![
1924                vec![serde_json::json!(123)], // Number should be converted to "123"
1925            ],
1926            total_rows: 1,
1927        };
1928
1929        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1930
1931        let array = batch
1932            .column(0)
1933            .as_any()
1934            .downcast_ref::<arrow::array::StringArray>()
1935            .unwrap();
1936        assert_eq!(array.value(0), "123");
1937    }
1938
1939    #[tokio::test]
1940    async fn test_column_major_to_record_batch_decimal_from_string() {
1941        let schema = Arc::new(Schema::new(vec![Field::new(
1942            "amount",
1943            arrow::datatypes::DataType::Decimal128(18, 2),
1944            true,
1945        )]));
1946
1947        let data = ResultData {
1948            columns: vec![],
1949            data: vec![
1950                vec![serde_json::json!("123.45")],
1951                vec![serde_json::json!("-67.89")],
1952                vec![serde_json::json!(null)],
1953            ],
1954            total_rows: 3,
1955        };
1956
1957        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1958
1959        assert_eq!(batch.num_rows(), 3);
1960
1961        let array = batch
1962            .column(0)
1963            .as_any()
1964            .downcast_ref::<arrow::array::Decimal128Array>()
1965            .unwrap();
1966        assert_eq!(array.value(0), 12345); // 123.45 with scale 2
1967        assert_eq!(array.value(1), -6789); // -67.89 with scale 2
1968        assert!(array.is_null(2));
1969    }
1970
1971    #[tokio::test]
1972    async fn test_column_major_to_record_batch_decimal_from_integer() {
1973        let schema = Arc::new(Schema::new(vec![Field::new(
1974            "amount",
1975            arrow::datatypes::DataType::Decimal128(18, 2),
1976            true,
1977        )]));
1978
1979        let data = ResultData {
1980            columns: vec![],
1981            data: vec![
1982                vec![serde_json::json!(100)], // Integer should be scaled
1983            ],
1984            total_rows: 1,
1985        };
1986
1987        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1988
1989        let array = batch
1990            .column(0)
1991            .as_any()
1992            .downcast_ref::<arrow::array::Decimal128Array>()
1993            .unwrap();
1994        assert_eq!(array.value(0), 10000); // 100 with scale 2 = 10000
1995    }
1996
1997    #[tokio::test]
1998    async fn test_column_major_to_record_batch_decimal_from_float() {
1999        let schema = Arc::new(Schema::new(vec![Field::new(
2000            "amount",
2001            arrow::datatypes::DataType::Decimal128(18, 2),
2002            true,
2003        )]));
2004
2005        let data = ResultData {
2006            columns: vec![],
2007            data: vec![
2008                vec![serde_json::json!(99.99)], // Float should be scaled
2009            ],
2010            total_rows: 1,
2011        };
2012
2013        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2014
2015        let array = batch
2016            .column(0)
2017            .as_any()
2018            .downcast_ref::<arrow::array::Decimal128Array>()
2019            .unwrap();
2020        assert_eq!(array.value(0), 9999); // 99.99 with scale 2 = 9999
2021    }
2022
2023    #[tokio::test]
2024    async fn test_column_major_to_record_batch_invalid_date_becomes_null() {
2025        let schema = Arc::new(Schema::new(vec![Field::new(
2026            "date",
2027            arrow::datatypes::DataType::Date32,
2028            true,
2029        )]));
2030
2031        let data = ResultData {
2032            columns: vec![],
2033            data: vec![
2034                vec![serde_json::json!("invalid-date")],
2035                vec![serde_json::json!(123)], // Non-string should become null
2036            ],
2037            total_rows: 2,
2038        };
2039
2040        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2041
2042        let array = batch
2043            .column(0)
2044            .as_any()
2045            .downcast_ref::<arrow::array::Date32Array>()
2046            .unwrap();
2047        // Invalid date formats become null
2048        assert!(array.is_null(0));
2049        assert!(array.is_null(1));
2050    }
2051
2052    #[tokio::test]
2053    async fn test_column_major_to_record_batch_invalid_timestamp_becomes_null() {
2054        let schema = Arc::new(Schema::new(vec![Field::new(
2055            "timestamp",
2056            arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
2057            true,
2058        )]));
2059
2060        let data = ResultData {
2061            columns: vec![],
2062            data: vec![
2063                vec![serde_json::json!("not-a-timestamp")],
2064                vec![serde_json::json!(12345)], // Non-string should become null
2065            ],
2066            total_rows: 2,
2067        };
2068
2069        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2070
2071        let array = batch
2072            .column(0)
2073            .as_any()
2074            .downcast_ref::<arrow::array::TimestampMicrosecondArray>()
2075            .unwrap();
2076        assert!(array.is_null(0));
2077        assert!(array.is_null(1));
2078    }
2079
2080    #[tokio::test]
2081    async fn test_column_major_to_record_batch_boolean_null_from_non_bool() {
2082        let schema = Arc::new(Schema::new(vec![Field::new(
2083            "flag",
2084            arrow::datatypes::DataType::Boolean,
2085            true,
2086        )]));
2087
2088        let data = ResultData {
2089            columns: vec![],
2090            data: vec![
2091                vec![serde_json::json!("not-a-bool")], // String is not a bool
2092                vec![serde_json::json!(123)],          // Number is not a bool
2093            ],
2094            total_rows: 2,
2095        };
2096
2097        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2098
2099        let array = batch
2100            .column(0)
2101            .as_any()
2102            .downcast_ref::<arrow::array::BooleanArray>()
2103            .unwrap();
2104        // Non-bool values become null
2105        assert!(array.is_null(0));
2106        assert!(array.is_null(1));
2107    }
2108
2109    #[tokio::test]
2110    async fn test_column_major_to_record_batch_int32_null_from_non_int() {
2111        let schema = Arc::new(Schema::new(vec![Field::new(
2112            "value",
2113            arrow::datatypes::DataType::Int32,
2114            true,
2115        )]));
2116
2117        let data = ResultData {
2118            columns: vec![],
2119            data: vec![
2120                vec![serde_json::json!("not-an-int")], // String is not parseable as int
2121            ],
2122            total_rows: 1,
2123        };
2124
2125        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2126
2127        let array = batch
2128            .column(0)
2129            .as_any()
2130            .downcast_ref::<arrow::array::Int32Array>()
2131            .unwrap();
2132        assert!(array.is_null(0));
2133    }
2134
2135    #[tokio::test]
2136    async fn test_column_major_to_record_batch_float64_null_from_non_float() {
2137        let schema = Arc::new(Schema::new(vec![Field::new(
2138            "value",
2139            arrow::datatypes::DataType::Float64,
2140            true,
2141        )]));
2142
2143        let data = ResultData {
2144            columns: vec![],
2145            data: vec![
2146                vec![serde_json::json!("not-a-float")], // String is not parseable as float
2147            ],
2148            total_rows: 1,
2149        };
2150
2151        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2152
2153        let array = batch
2154            .column(0)
2155            .as_any()
2156            .downcast_ref::<arrow::array::Float64Array>()
2157            .unwrap();
2158        assert!(array.is_null(0));
2159    }
2160
2161    #[tokio::test]
2162    async fn test_column_major_to_record_batch_unsupported_type_returns_error() {
2163        // Test that unsupported types (like LargeUtf8) result in an ArrowError
2164        // because the fallback creates a StringArray which doesn't match the schema
2165        let schema = Arc::new(Schema::new(vec![Field::new(
2166            "large_text",
2167            arrow::datatypes::DataType::LargeUtf8,
2168            true,
2169        )]));
2170
2171        let data = ResultData {
2172            columns: vec![],
2173            data: vec![vec![serde_json::json!("test_data")]],
2174            total_rows: 1,
2175        };
2176
2177        let result = ResultSet::column_major_to_record_batch(&data, &schema);
2178
2179        // Unsupported types cause an error because the fallback creates a StringArray
2180        // which doesn't match the expected LargeUtf8 type in the schema
2181        assert!(result.is_err());
2182    }
2183
2184    // =========================================================================
2185    // Tests for ResultSetIterator::metadata
2186    // =========================================================================
2187
2188    #[tokio::test]
2189    async fn test_result_set_iterator_metadata() {
2190        let mock_transport = MockTransport::new();
2191        let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
2192
2193        let data = ResultData {
2194            columns: vec![ColumnInfo {
2195                name: "id".to_string(),
2196                data_type: DataType {
2197                    type_name: "DECIMAL".to_string(),
2198                    precision: Some(18),
2199                    scale: Some(0),
2200                    size: None,
2201                    character_set: None,
2202                    with_local_time_zone: None,
2203                    fraction: None,
2204                },
2205            }],
2206            data: vec![vec![serde_json::json!(1)]],
2207            total_rows: 1,
2208        };
2209
2210        let result = TransportQueryResult::ResultSet {
2211            handle: Some(ResultSetHandle::new(1)),
2212            data,
2213        };
2214
2215        let result_set = ResultSet::from_transport_result(result, transport).unwrap();
2216        let iterator = result_set.into_iterator().unwrap();
2217
2218        let metadata = iterator.metadata();
2219        assert_eq!(metadata.column_count, 1);
2220        assert_eq!(metadata.total_rows, Some(1));
2221        assert_eq!(metadata.column_names(), vec!["id"]);
2222    }
2223
2224    #[tokio::test]
2225    async fn test_result_set_iterator_metadata_multiple_columns() {
2226        let mock_transport = MockTransport::new();
2227        let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
2228
2229        let data = ResultData {
2230            columns: vec![
2231                ColumnInfo {
2232                    name: "id".to_string(),
2233                    data_type: DataType {
2234                        type_name: "DECIMAL".to_string(),
2235                        precision: Some(18),
2236                        scale: Some(0),
2237                        size: None,
2238                        character_set: None,
2239                        with_local_time_zone: None,
2240                        fraction: None,
2241                    },
2242                },
2243                ColumnInfo {
2244                    name: "name".to_string(),
2245                    data_type: DataType {
2246                        type_name: "VARCHAR".to_string(),
2247                        precision: None,
2248                        scale: None,
2249                        size: Some(100),
2250                        character_set: Some("UTF8".to_string()),
2251                        with_local_time_zone: None,
2252                        fraction: None,
2253                    },
2254                },
2255                ColumnInfo {
2256                    name: "active".to_string(),
2257                    data_type: DataType {
2258                        type_name: "BOOLEAN".to_string(),
2259                        precision: None,
2260                        scale: None,
2261                        size: None,
2262                        character_set: None,
2263                        with_local_time_zone: None,
2264                        fraction: None,
2265                    },
2266                },
2267            ],
2268            data: vec![vec![
2269                serde_json::json!(1),
2270                serde_json::json!("Alice"),
2271                serde_json::json!(true),
2272            ]],
2273            total_rows: 1,
2274        };
2275
2276        let result = TransportQueryResult::ResultSet {
2277            handle: Some(ResultSetHandle::new(1)),
2278            data,
2279        };
2280
2281        let result_set = ResultSet::from_transport_result(result, transport).unwrap();
2282        let iterator = result_set.into_iterator().unwrap();
2283
2284        let metadata = iterator.metadata();
2285        assert_eq!(metadata.column_count, 3);
2286        assert_eq!(metadata.column_names(), vec!["id", "name", "active"]);
2287
2288        let types = metadata.column_types();
2289        assert_eq!(types.len(), 3);
2290        assert!(matches!(
2291            types[0],
2292            arrow::datatypes::DataType::Decimal128(18, 0)
2293        ));
2294        assert!(matches!(types[1], arrow::datatypes::DataType::Utf8));
2295        assert!(matches!(types[2], arrow::datatypes::DataType::Boolean));
2296    }
2297}