Skip to main content

exarrow_rs/query/
results.rs

1//! Result set handling and iteration.
2//!
3//! This module provides types for handling query results, including
4//! streaming result sets and metadata.
5
6use crate::error::{ConversionError, QueryError};
7use crate::transport::messages::{ColumnInfo, ResultData, ResultSetHandle};
8use crate::transport::protocol::QueryResult as TransportQueryResult;
9use crate::transport::TransportProtocol;
10use crate::types::TypeMapper;
11use arrow::array::RecordBatch;
12use arrow::datatypes::{Field, Schema};
13use std::sync::Arc;
14use tokio::sync::Mutex;
15
16/// Metadata about a query execution.
17#[derive(Debug, Clone)]
18pub struct QueryMetadata {
19    /// Schema of the result set
20    pub schema: Arc<Schema>,
21    /// Total number of rows (if known)
22    pub total_rows: Option<i64>,
23    /// Number of columns
24    pub column_count: usize,
25    /// Execution time in milliseconds (if available)
26    pub execution_time_ms: Option<u64>,
27}
28
29impl QueryMetadata {
30    /// Create metadata from schema and row count.
31    pub fn new(schema: Arc<Schema>, total_rows: Option<i64>) -> Self {
32        Self {
33            column_count: schema.fields().len(),
34            schema,
35            total_rows,
36            execution_time_ms: None,
37        }
38    }
39
40    /// Set execution time.
41    pub fn with_execution_time(mut self, execution_time_ms: u64) -> Self {
42        self.execution_time_ms = Some(execution_time_ms);
43        self
44    }
45
46    /// Get column names.
47    pub fn column_names(&self) -> Vec<&str> {
48        self.schema
49            .fields()
50            .iter()
51            .map(|f| f.name().as_str())
52            .collect()
53    }
54
55    /// Get column types.
56    pub fn column_types(&self) -> Vec<&arrow::datatypes::DataType> {
57        self.schema.fields().iter().map(|f| f.data_type()).collect()
58    }
59}
60
61/// Query result set that can be either a row count or streaming data.
62pub struct ResultSet {
63    /// Result type
64    inner: ResultSetInner,
65    /// Transport reference for fetching more data
66    transport: Arc<Mutex<dyn TransportProtocol>>,
67}
68
69impl std::fmt::Debug for ResultSet {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("ResultSet")
72            .field("inner", &self.inner)
73            .field("transport", &"<TransportProtocol>")
74            .finish()
75    }
76}
77
78#[derive(Debug)]
79enum ResultSetInner {
80    /// Row count result (INSERT, UPDATE, DELETE)
81    RowCount { count: i64 },
82    /// Streaming result set (SELECT)
83    Stream {
84        /// Result set handle for fetching
85        handle: Option<ResultSetHandle>,
86        /// Query metadata
87        metadata: QueryMetadata,
88        /// Buffered batches
89        batches: Vec<RecordBatch>,
90        /// Whether all data has been fetched
91        complete: bool,
92    },
93}
94
95impl ResultSet {
96    /// Create a result set from a transport query result.
97    pub(crate) fn from_transport_result(
98        result: TransportQueryResult,
99        transport: Arc<Mutex<dyn TransportProtocol>>,
100    ) -> Result<Self, QueryError> {
101        match result {
102            TransportQueryResult::RowCount { count } => Ok(Self {
103                inner: ResultSetInner::RowCount { count },
104                transport,
105            }),
106            TransportQueryResult::ResultSet { handle, data } => {
107                let schema = Self::build_schema(&data.columns)
108                    .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
109
110                let metadata = QueryMetadata::new(Arc::clone(&schema), Some(data.total_rows));
111
112                // Convert initial data to RecordBatch
113                // Note: data.data is in column-major format (each inner array is a column)
114                let batches = if !data.data.is_empty() {
115                    vec![Self::column_major_to_record_batch(&data, &schema)
116                        .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?]
117                } else {
118                    Vec::new()
119                };
120
121                // Result is complete if all data was in initial response OR no handle provided
122                // For column-major data, the number of rows is determined by the length of the first column
123                let num_rows_received = if data.data.is_empty() {
124                    0
125                } else {
126                    data.data[0].len() as i64
127                };
128                let complete = handle.is_none() || data.total_rows == num_rows_received;
129
130                Ok(Self {
131                    inner: ResultSetInner::Stream {
132                        handle, // Already Option<ResultSetHandle>
133                        metadata,
134                        batches,
135                        complete,
136                    },
137                    transport,
138                })
139            }
140        }
141    }
142
143    /// Get the row count if this is a row count result.
144    pub fn row_count(&self) -> Option<i64> {
145        match &self.inner {
146            ResultSetInner::RowCount { count } => Some(*count),
147            _ => None,
148        }
149    }
150
151    /// Get the metadata if this is a streaming result.
152    pub fn metadata(&self) -> Option<&QueryMetadata> {
153        match &self.inner {
154            ResultSetInner::Stream { metadata, .. } => Some(metadata),
155            _ => None,
156        }
157    }
158
159    /// Check if this is a streaming result set.
160    pub fn is_stream(&self) -> bool {
161        matches!(&self.inner, ResultSetInner::Stream { .. })
162    }
163
164    /// Convert to an iterator over RecordBatches.
165    ///
166    /// # Errors
167    /// Returns `QueryError::NoResultSet` if this is not a streaming result.
168    pub fn into_iterator(self) -> Result<ResultSetIterator, QueryError> {
169        match self.inner {
170            ResultSetInner::Stream {
171                handle,
172                metadata,
173                batches,
174                complete,
175            } => Ok(ResultSetIterator {
176                handle,
177                transport: self.transport,
178                metadata,
179                batches,
180                current_index: 0,
181                complete,
182            }),
183            ResultSetInner::RowCount { .. } => Err(QueryError::NoResultSet(
184                "Cannot iterate over row count result".to_string(),
185            )),
186        }
187    }
188
189    /// Fetch all remaining batches into memory.
190    ///
191    /// # Errors
192    /// Returns `QueryError` if fetching fails or if this is not a streaming result.
193    pub async fn fetch_all(mut self) -> Result<Vec<RecordBatch>, QueryError> {
194        match &mut self.inner {
195            ResultSetInner::Stream {
196                handle,
197                metadata,
198                batches,
199                complete,
200            } => {
201                let all_batches = if *complete {
202                    batches.clone()
203                } else {
204                    let mut all_batches = batches.clone();
205
206                    // Fetch remaining data
207                    if let Some(handle_val) = handle {
208                        loop {
209                            let mut transport = self.transport.lock().await;
210                            let result_data = transport
211                                .fetch_results(*handle_val)
212                                .await
213                                .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
214
215                            if result_data.data.is_empty() {
216                                *complete = true;
217                                break;
218                            }
219
220                            let batch =
221                                Self::column_major_to_record_batch(&result_data, &metadata.schema)
222                                    .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
223
224                            all_batches.push(batch);
225
226                            // Check if we've fetched everything
227                            if result_data.total_rows > 0
228                                && all_batches.iter().map(|b| b.num_rows()).sum::<usize>()
229                                    >= result_data.total_rows as usize
230                            {
231                                *complete = true;
232                                break;
233                            }
234                        }
235                    }
236
237                    *batches = all_batches.clone();
238                    all_batches
239                };
240
241                // Close the result set handle on the server to release resources
242                if let Some(handle_val) = handle.take() {
243                    let mut transport = self.transport.lock().await;
244                    // Ignore close errors - we've already fetched the data
245                    let _ = transport.close_result_set(handle_val).await;
246                }
247
248                Ok(all_batches)
249            }
250            ResultSetInner::RowCount { .. } => Err(QueryError::NoResultSet(
251                "Cannot fetch batches from row count result".to_string(),
252            )),
253        }
254    }
255
256    /// Build Arrow schema from column information.
257    fn build_schema(columns: &[ColumnInfo]) -> Result<Arc<Schema>, ConversionError> {
258        let fields: Result<Vec<Field>, ConversionError> = columns
259            .iter()
260            .map(|col| {
261                // Parse Exasol type from DataType
262                let arrow_type = Self::exasol_datatype_to_arrow(&col.data_type)?;
263
264                // All fields are nullable by default in Exasol
265                Ok(Field::new(&col.name, arrow_type, true))
266            })
267            .collect();
268
269        Ok(Arc::new(Schema::new(fields?)))
270    }
271
272    /// Convert Exasol DataType to Arrow DataType.
273    fn exasol_datatype_to_arrow(
274        data_type: &crate::transport::messages::DataType,
275    ) -> Result<arrow::datatypes::DataType, ConversionError> {
276        use crate::types::ExasolType;
277
278        let exasol_type = match data_type.type_name.as_str() {
279            "BOOLEAN" => ExasolType::Boolean,
280            "CHAR" => ExasolType::Char {
281                size: data_type.size.unwrap_or(1) as usize,
282            },
283            "VARCHAR" => ExasolType::Varchar {
284                size: data_type.size.unwrap_or(2000000) as usize,
285            },
286            "DECIMAL" => ExasolType::Decimal {
287                precision: data_type.precision.unwrap_or(18) as u8,
288                scale: data_type.scale.unwrap_or(0) as i8,
289            },
290            "DOUBLE" => ExasolType::Double,
291            "DATE" => ExasolType::Date,
292            "TIMESTAMP" => ExasolType::Timestamp {
293                with_local_time_zone: data_type.with_local_time_zone.unwrap_or(false),
294            },
295            "INTERVAL YEAR TO MONTH" => ExasolType::IntervalYearToMonth,
296            "INTERVAL DAY TO SECOND" => ExasolType::IntervalDayToSecond {
297                precision: data_type.fraction.unwrap_or(3) as u8,
298            },
299            "GEOMETRY" => ExasolType::Geometry { srid: None },
300            "HASHTYPE" => ExasolType::Hashtype { byte_size: 16 },
301            _ => {
302                return Err(ConversionError::UnsupportedType {
303                    exasol_type: data_type.type_name.clone(),
304                })
305            }
306        };
307
308        TypeMapper::exasol_to_arrow(&exasol_type, true)
309    }
310
311    /// Convert row-major result data to RecordBatch.
312    ///
313    /// Data is expected in row-major format where `data[row_idx][col_idx]` contains
314    /// the value at that position.
315    ///
316    /// Example: For a result with 2 columns (id, name) and 3 rows:
317    fn column_major_to_record_batch(
318        data: &ResultData,
319        schema: &Arc<Schema>,
320    ) -> Result<RecordBatch, ConversionError> {
321        use arrow::array::*;
322        use serde_json::Value;
323
324        if data.data.is_empty() {
325            // Create empty batch with schema
326            let empty_arrays: Vec<Arc<dyn Array>> = schema
327                .fields()
328                .iter()
329                .map(|field| new_empty_array(field.data_type()))
330                .collect();
331
332            return RecordBatch::try_new(Arc::clone(schema), empty_arrays)
333                .map_err(|e| ConversionError::ArrowError(e.to_string()));
334        }
335
336        // Extract column values from row-major data
337        let num_columns = schema.fields().len();
338        let column_values: Vec<Vec<&Value>> = (0..num_columns)
339            .map(|col_idx| {
340                data.data
341                    .iter()
342                    .map(|row| row.get(col_idx).unwrap_or(&Value::Null))
343                    .collect()
344            })
345            .collect();
346
347        let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
348
349        for (col_idx, field) in schema.fields().iter().enumerate() {
350            use arrow::datatypes::DataType;
351
352            // Get the column data (all values for this column)
353            let col_values = &column_values[col_idx];
354
355            // Build array based on data type
356            let array: Arc<dyn Array> = match field.data_type() {
357                DataType::Boolean => {
358                    let mut builder = BooleanBuilder::new();
359                    for value in col_values {
360                        if value.is_null() {
361                            builder.append_null();
362                        } else if let Some(b) = value.as_bool() {
363                            builder.append_value(b);
364                        } else {
365                            builder.append_null();
366                        }
367                    }
368                    Arc::new(builder.finish())
369                }
370                DataType::Int32 => {
371                    let mut builder = Int32Builder::new();
372                    for value in col_values {
373                        if value.is_null() {
374                            builder.append_null();
375                        } else if let Some(i) = value.as_i64() {
376                            builder.append_value(i as i32);
377                        } else {
378                            builder.append_null();
379                        }
380                    }
381                    Arc::new(builder.finish())
382                }
383                DataType::Int64 => {
384                    let mut builder = Int64Builder::new();
385                    for value in col_values {
386                        if value.is_null() {
387                            builder.append_null();
388                        } else if let Some(i) = value.as_i64() {
389                            builder.append_value(i);
390                        } else {
391                            builder.append_null();
392                        }
393                    }
394                    Arc::new(builder.finish())
395                }
396                DataType::Float64 => {
397                    let mut builder = Float64Builder::new();
398                    for value in col_values {
399                        if value.is_null() {
400                            builder.append_null();
401                        } else if let Some(f) = value.as_f64() {
402                            builder.append_value(f);
403                        } else {
404                            builder.append_null();
405                        }
406                    }
407                    Arc::new(builder.finish())
408                }
409                DataType::Utf8 => {
410                    let mut builder = StringBuilder::new();
411                    for value in col_values {
412                        if value.is_null() {
413                            builder.append_null();
414                        } else if let Some(s) = value.as_str() {
415                            builder.append_value(s);
416                        } else {
417                            // Convert to string
418                            builder.append_value(value.to_string());
419                        }
420                    }
421                    Arc::new(builder.finish())
422                }
423                DataType::Decimal128(precision, scale) => {
424                    let mut builder = Decimal128Builder::new()
425                        .with_precision_and_scale(*precision, *scale)
426                        .map_err(|e| ConversionError::ArrowError(e.to_string()))?;
427
428                    for value in col_values {
429                        if value.is_null() {
430                            builder.append_null();
431                        } else if let Some(s) = value.as_str() {
432                            // Parse string decimal (most common format from Exasol)
433                            let scaled = Self::parse_string_to_decimal(s, *scale)?;
434                            builder.append_value(scaled);
435                        } else if let Some(i) = value.as_i64() {
436                            // Scale the integer value
437                            let scaled = i * 10i64.pow(*scale as u32);
438                            builder.append_value(scaled as i128);
439                        } else if let Some(f) = value.as_f64() {
440                            let scaled = (f * 10f64.powi(*scale as i32)) as i128;
441                            builder.append_value(scaled);
442                        } else {
443                            builder.append_null();
444                        }
445                    }
446                    Arc::new(builder.finish())
447                }
448                DataType::Date32 => {
449                    let mut builder = Date32Builder::new();
450                    for value in col_values {
451                        if value.is_null() {
452                            builder.append_null();
453                        } else if let Some(s) = value.as_str() {
454                            // Parse date string "YYYY-MM-DD" to days since Unix epoch
455                            match Self::parse_date_to_days(s) {
456                                Ok(days) => builder.append_value(days),
457                                Err(_) => builder.append_null(),
458                            }
459                        } else {
460                            builder.append_null();
461                        }
462                    }
463                    Arc::new(builder.finish())
464                }
465                DataType::Timestamp(_, _) => {
466                    let mut builder = TimestampMicrosecondBuilder::new();
467                    for value in col_values {
468                        if value.is_null() {
469                            builder.append_null();
470                        } else if let Some(s) = value.as_str() {
471                            // Parse timestamp string to microseconds since Unix epoch
472                            match Self::parse_timestamp_to_micros(s) {
473                                Ok(micros) => builder.append_value(micros),
474                                Err(_) => builder.append_null(),
475                            }
476                        } else {
477                            builder.append_null();
478                        }
479                    }
480                    Arc::new(builder.finish())
481                }
482                _ => {
483                    // Fallback to string for unsupported types
484                    let mut builder = StringBuilder::new();
485                    for value in col_values {
486                        if value.is_null() {
487                            builder.append_null();
488                        } else {
489                            builder.append_value(value.to_string());
490                        }
491                    }
492                    Arc::new(builder.finish())
493                }
494            };
495
496            arrays.push(array);
497        }
498
499        RecordBatch::try_new(Arc::clone(schema), arrays)
500            .map_err(|e| ConversionError::ArrowError(e.to_string()))
501    }
502
503    /// Parse a date string "YYYY-MM-DD" to days since Unix epoch (1970-01-01).
504    fn parse_date_to_days(date_str: &str) -> Result<i32, ()> {
505        let parts: Vec<&str> = date_str.split('-').collect();
506        if parts.len() != 3 {
507            return Err(());
508        }
509
510        let year: i32 = parts[0].parse().map_err(|_| ())?;
511        let month: u32 = parts[1].parse().map_err(|_| ())?;
512        let day: u32 = parts[2].parse().map_err(|_| ())?;
513
514        if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
515            return Err(());
516        }
517
518        // Calculate days since Unix epoch
519        let days_from_year =
520            (year - 1970) * 365 + (year - 1969) / 4 - (year - 1901) / 100 + (year - 1601) / 400;
521        let days_from_month = match month {
522            1 => 0,
523            2 => 31,
524            3 => 59,
525            4 => 90,
526            5 => 120,
527            6 => 151,
528            7 => 181,
529            8 => 212,
530            9 => 243,
531            10 => 273,
532            11 => 304,
533            12 => 334,
534            _ => return Err(()),
535        };
536
537        // Add leap day if after February and leap year
538        let is_leap_year = (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0);
539        let leap_adjustment = if month > 2 && is_leap_year { 1 } else { 0 };
540
541        Ok(days_from_year + days_from_month + day as i32 - 1 + leap_adjustment)
542    }
543
544    /// Parse a timestamp string to microseconds since Unix epoch.
545    fn parse_timestamp_to_micros(timestamp_str: &str) -> Result<i64, ()> {
546        // Split date and time
547        let parts: Vec<&str> = timestamp_str.split(' ').collect();
548        if parts.is_empty() {
549            return Err(());
550        }
551
552        // Parse date part
553        let days = Self::parse_date_to_days(parts[0])?;
554        let mut micros = days as i64 * 86400 * 1_000_000;
555
556        // Parse time part if present
557        if parts.len() > 1 {
558            let time_parts: Vec<&str> = parts[1].split(':').collect();
559            if time_parts.len() >= 2 {
560                let hours: i64 = time_parts[0].parse().map_err(|_| ())?;
561                let minutes: i64 = time_parts[1].parse().map_err(|_| ())?;
562
563                micros += hours * 3600 * 1_000_000;
564                micros += minutes * 60 * 1_000_000;
565
566                if time_parts.len() >= 3 {
567                    // Parse seconds and microseconds
568                    let sec_parts: Vec<&str> = time_parts[2].split('.').collect();
569                    let seconds: i64 = sec_parts[0].parse().map_err(|_| ())?;
570
571                    micros += seconds * 1_000_000;
572
573                    if sec_parts.len() > 1 {
574                        // Parse fractional seconds (microseconds)
575                        let frac = sec_parts[1];
576                        let frac_micros = if frac.len() <= 6 {
577                            let padding = 6 - frac.len();
578                            let padded = format!("{}{}", frac, "0".repeat(padding));
579                            padded.parse::<i64>().unwrap_or(0)
580                        } else {
581                            frac[..6].parse::<i64>().unwrap_or(0)
582                        };
583                        micros += frac_micros;
584                    }
585                }
586            }
587        }
588
589        Ok(micros)
590    }
591
592    /// Parse a decimal string to i128 scaled value.
593    ///
594    /// Handles formats like "123", "123.45", "-123.45"
595    fn parse_string_to_decimal(s: &str, scale: i8) -> Result<i128, ConversionError> {
596        // Handle empty string
597        if s.is_empty() {
598            return Err(ConversionError::InvalidFormat(
599                "Empty decimal string".to_string(),
600            ));
601        }
602
603        // Split on decimal point
604        let parts: Vec<&str> = s.split('.').collect();
605
606        let (integer_part, decimal_part) = match parts.len() {
607            1 => (parts[0], ""),
608            2 => (parts[0], parts[1]),
609            _ => {
610                return Err(ConversionError::InvalidFormat(format!(
611                    "Invalid decimal format: {}",
612                    s
613                )));
614            }
615        };
616
617        // Parse the integer part
618        let mut result: i128 = integer_part.parse().map_err(|_| {
619            ConversionError::InvalidFormat(format!("Invalid integer part: {}", integer_part))
620        })?;
621
622        // Scale up by 10^scale
623        result = result
624            .checked_mul(10_i128.pow(scale as u32))
625            .ok_or_else(|| ConversionError::InvalidFormat("Decimal overflow".to_string()))?;
626
627        // Add the decimal part
628        if !decimal_part.is_empty() {
629            let decimal_digits = decimal_part.len().min(scale as usize);
630            let decimal_value: i128 = decimal_part[..decimal_digits].parse().map_err(|_| {
631                ConversionError::InvalidFormat(format!("Invalid decimal part: {}", decimal_part))
632            })?;
633
634            // Scale the decimal part appropriately
635            let scale_diff = scale as usize - decimal_digits;
636            let scaled_decimal = decimal_value * 10_i128.pow(scale_diff as u32);
637
638            result = result
639                .checked_add(if integer_part.starts_with('-') {
640                    -scaled_decimal
641                } else {
642                    scaled_decimal
643                })
644                .ok_or_else(|| ConversionError::InvalidFormat("Decimal overflow".to_string()))?;
645        }
646
647        Ok(result)
648    }
649}
650
651/// Iterator over RecordBatches from a result set.
652///
653/// Lazily fetches data from the transport as needed.
654pub struct ResultSetIterator {
655    /// Result set handle
656    handle: Option<ResultSetHandle>,
657    /// Transport reference
658    transport: Arc<Mutex<dyn TransportProtocol>>,
659    /// Query metadata
660    metadata: QueryMetadata,
661    /// Buffered batches
662    batches: Vec<RecordBatch>,
663    /// Current iteration index
664    current_index: usize,
665    /// Whether all data has been fetched
666    complete: bool,
667}
668
669impl ResultSetIterator {
670    /// Get the query metadata.
671    pub fn metadata(&self) -> &QueryMetadata {
672        &self.metadata
673    }
674
675    /// Fetch the next batch from the transport.
676    async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>, QueryError> {
677        if self.complete {
678            return Ok(None);
679        }
680
681        let handle = match self.handle {
682            Some(h) => h,
683            None => {
684                self.complete = true;
685                return Ok(None);
686            }
687        };
688
689        let mut transport = self.transport.lock().await;
690        let result_data = transport
691            .fetch_results(handle)
692            .await
693            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
694
695        if result_data.data.is_empty() {
696            self.complete = true;
697            return Ok(None);
698        }
699
700        let batch = ResultSet::column_major_to_record_batch(&result_data, &self.metadata.schema)
701            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
702
703        Ok(Some(batch))
704    }
705
706    /// Get the next batch synchronously (blocking).
707    ///
708    /// This is useful for implementing sync Iterator trait.
709    pub fn next_batch(&mut self) -> Option<Result<RecordBatch, QueryError>> {
710        // Return buffered batch if available
711        if self.current_index < self.batches.len() {
712            let batch = self.batches[self.current_index].clone();
713            self.current_index += 1;
714            return Some(Ok(batch));
715        }
716
717        // Need to fetch more data
718        if self.complete {
719            return None;
720        }
721
722        // Use block_on for async fetch (not ideal, but works for Phase 1)
723        let runtime = tokio::runtime::Handle::try_current();
724        if let Ok(handle) = runtime {
725            let result = handle.block_on(self.fetch_next_batch());
726            match result {
727                Ok(Some(batch)) => {
728                    self.batches.push(batch.clone());
729                    self.current_index += 1;
730                    Some(Ok(batch))
731                }
732                Ok(None) => None,
733                Err(e) => Some(Err(e)),
734            }
735        } else {
736            // No runtime available
737            Some(Err(QueryError::InvalidState(
738                "No async runtime available".to_string(),
739            )))
740        }
741    }
742
743    /// Close the result set and release resources.
744    pub async fn close(mut self) -> Result<(), QueryError> {
745        if let Some(handle) = self.handle.take() {
746            let mut transport = self.transport.lock().await;
747            transport
748                .close_result_set(handle)
749                .await
750                .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
751        }
752        Ok(())
753    }
754}
755
756// Implement Iterator for ResultSetIterator
757impl Iterator for ResultSetIterator {
758    type Item = Result<RecordBatch, QueryError>;
759
760    fn next(&mut self) -> Option<Self::Item> {
761        self.next_batch()
762    }
763}
764
765#[cfg(test)]
766mod tests {
767    use super::*;
768    use crate::transport::messages::{ColumnInfo, DataType};
769    use crate::transport::protocol::{
770        PreparedStatementHandle, QueryResult as TransportQueryResult,
771    };
772    use arrow::array::Array;
773    use async_trait::async_trait;
774    use mockall::mock;
775
776    // Mock transport for testing
777    mock! {
778        pub Transport {}
779
780        #[async_trait]
781        impl TransportProtocol for Transport {
782            async fn connect(&mut self, params: &crate::transport::protocol::ConnectionParams) -> Result<(), crate::error::TransportError>;
783            async fn authenticate(&mut self, credentials: &crate::transport::protocol::Credentials) -> Result<crate::transport::messages::SessionInfo, crate::error::TransportError>;
784            async fn execute_query(&mut self, sql: &str) -> Result<TransportQueryResult, crate::error::TransportError>;
785            async fn fetch_results(&mut self, handle: ResultSetHandle) -> Result<ResultData, crate::error::TransportError>;
786            async fn close_result_set(&mut self, handle: ResultSetHandle) -> Result<(), crate::error::TransportError>;
787            async fn create_prepared_statement(&mut self, sql: &str) -> Result<PreparedStatementHandle, crate::error::TransportError>;
788            async fn execute_prepared_statement(&mut self, handle: &PreparedStatementHandle, parameters: Option<Vec<Vec<serde_json::Value>>>) -> Result<TransportQueryResult, crate::error::TransportError>;
789            async fn close_prepared_statement(&mut self, handle: &PreparedStatementHandle) -> Result<(), crate::error::TransportError>;
790            async fn close(&mut self) -> Result<(), crate::error::TransportError>;
791            fn is_connected(&self) -> bool;
792        }
793    }
794
795    #[tokio::test]
796    async fn test_result_set_row_count() {
797        let mock_transport = MockTransport::new();
798        let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
799
800        let result = TransportQueryResult::RowCount { count: 42 };
801        let result_set = ResultSet::from_transport_result(result, transport).unwrap();
802
803        assert_eq!(result_set.row_count(), Some(42));
804        assert!(!result_set.is_stream());
805        assert!(result_set.metadata().is_none());
806    }
807
808    #[tokio::test]
809    async fn test_result_set_stream() {
810        let mock_transport = MockTransport::new();
811        let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
812
813        // Row-major data format:
814        // Row 0: [1, "Alice"]
815        // Row 1: [2, "Bob"]
816        let data = ResultData {
817            columns: vec![
818                ColumnInfo {
819                    name: "id".to_string(),
820                    data_type: DataType {
821                        type_name: "DECIMAL".to_string(),
822                        precision: Some(18),
823                        scale: Some(0),
824                        size: None,
825                        character_set: None,
826                        with_local_time_zone: None,
827                        fraction: None,
828                    },
829                },
830                ColumnInfo {
831                    name: "name".to_string(),
832                    data_type: DataType {
833                        type_name: "VARCHAR".to_string(),
834                        precision: None,
835                        scale: None,
836                        size: Some(100),
837                        character_set: Some("UTF8".to_string()),
838                        with_local_time_zone: None,
839                        fraction: None,
840                    },
841                },
842            ],
843            data: vec![
844                vec![serde_json::json!(1), serde_json::json!("Alice")],
845                vec![serde_json::json!(2), serde_json::json!("Bob")],
846            ],
847            total_rows: 2,
848        };
849
850        let result = TransportQueryResult::ResultSet {
851            handle: Some(ResultSetHandle::new(1)),
852            data,
853        };
854
855        let result_set = ResultSet::from_transport_result(result, transport).unwrap();
856
857        assert!(result_set.row_count().is_none());
858        assert!(result_set.is_stream());
859
860        let metadata = result_set.metadata().unwrap();
861        assert_eq!(metadata.column_count, 2);
862        assert_eq!(metadata.total_rows, Some(2));
863        assert_eq!(metadata.column_names(), vec!["id", "name"]);
864    }
865
866    #[tokio::test]
867    async fn test_result_set_to_record_batch() {
868        let schema = Arc::new(Schema::new(vec![
869            Field::new("id", arrow::datatypes::DataType::Decimal128(18, 0), true),
870            Field::new("name", arrow::datatypes::DataType::Utf8, true),
871            Field::new("active", arrow::datatypes::DataType::Boolean, true),
872        ]));
873
874        // Row-major data format:
875        // Row 0: [1, "Alice", true]
876        // Row 1: [2, "Bob", false]
877        let data = ResultData {
878            columns: vec![],
879            data: vec![
880                vec![
881                    serde_json::json!(1),
882                    serde_json::json!("Alice"),
883                    serde_json::json!(true),
884                ],
885                vec![
886                    serde_json::json!(2),
887                    serde_json::json!("Bob"),
888                    serde_json::json!(false),
889                ],
890            ],
891            total_rows: 2,
892        };
893
894        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
895
896        assert_eq!(batch.num_rows(), 2);
897        assert_eq!(batch.num_columns(), 3);
898    }
899
900    #[tokio::test]
901    async fn test_single_row_single_column() {
902        // Test case for: SELECT 42 AS answer
903        let schema = Arc::new(Schema::new(vec![Field::new(
904            "answer",
905            arrow::datatypes::DataType::Decimal128(18, 0),
906            true,
907        )]));
908
909        // Row-major: one row with one value
910        let data = ResultData {
911            columns: vec![],
912            data: vec![
913                vec![serde_json::json!(42)], // Row 0: [42]
914            ],
915            total_rows: 1,
916        };
917
918        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
919
920        assert_eq!(batch.num_rows(), 1, "Should have exactly 1 row");
921        assert_eq!(batch.num_columns(), 1, "Should have exactly 1 column");
922    }
923
924    #[tokio::test]
925    async fn test_single_row_two_columns() {
926        // Test case for: SELECT 42 AS answer, 'hello' AS greeting
927        let schema = Arc::new(Schema::new(vec![
928            Field::new(
929                "answer",
930                arrow::datatypes::DataType::Decimal128(18, 0),
931                true,
932            ),
933            Field::new("greeting", arrow::datatypes::DataType::Utf8, true),
934        ]));
935
936        // Row-major: one row with two values
937        let data = ResultData {
938            columns: vec![],
939            data: vec![
940                vec![serde_json::json!(42), serde_json::json!("hello")], // Row 0: [42, "hello"]
941            ],
942            total_rows: 1,
943        };
944
945        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
946
947        assert_eq!(batch.num_rows(), 1, "Should have exactly 1 row");
948        assert_eq!(batch.num_columns(), 2, "Should have exactly 2 columns");
949    }
950
951    #[tokio::test]
952    async fn test_ten_rows_two_columns() {
953        // Test case for: SELECT LEVEL AS id, 'Row ' || LEVEL AS label FROM DUAL CONNECT BY LEVEL <= 10
954        let schema = Arc::new(Schema::new(vec![
955            Field::new("id", arrow::datatypes::DataType::Decimal128(18, 0), true),
956            Field::new("label", arrow::datatypes::DataType::Utf8, true),
957        ]));
958
959        // Row-major: ten rows, each with two values
960        let data = ResultData {
961            columns: vec![],
962            data: vec![
963                vec![serde_json::json!(1), serde_json::json!("Row 1")],
964                vec![serde_json::json!(2), serde_json::json!("Row 2")],
965                vec![serde_json::json!(3), serde_json::json!("Row 3")],
966                vec![serde_json::json!(4), serde_json::json!("Row 4")],
967                vec![serde_json::json!(5), serde_json::json!("Row 5")],
968                vec![serde_json::json!(6), serde_json::json!("Row 6")],
969                vec![serde_json::json!(7), serde_json::json!("Row 7")],
970                vec![serde_json::json!(8), serde_json::json!("Row 8")],
971                vec![serde_json::json!(9), serde_json::json!("Row 9")],
972                vec![serde_json::json!(10), serde_json::json!("Row 10")],
973            ],
974            total_rows: 10,
975        };
976
977        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
978
979        assert_eq!(batch.num_rows(), 10, "Should have exactly 10 rows");
980        assert_eq!(batch.num_columns(), 2, "Should have exactly 2 columns");
981    }
982
983    #[tokio::test]
984    async fn test_schema_building() {
985        let columns = vec![
986            ColumnInfo {
987                name: "id".to_string(),
988                data_type: DataType {
989                    type_name: "DECIMAL".to_string(),
990                    precision: Some(18),
991                    scale: Some(0),
992                    size: None,
993                    character_set: None,
994                    with_local_time_zone: None,
995                    fraction: None,
996                },
997            },
998            ColumnInfo {
999                name: "name".to_string(),
1000                data_type: DataType {
1001                    type_name: "VARCHAR".to_string(),
1002                    precision: None,
1003                    scale: None,
1004                    size: Some(100),
1005                    character_set: Some("UTF8".to_string()),
1006                    with_local_time_zone: None,
1007                    fraction: None,
1008                },
1009            },
1010            ColumnInfo {
1011                name: "created_at".to_string(),
1012                data_type: DataType {
1013                    type_name: "TIMESTAMP".to_string(),
1014                    precision: None,
1015                    scale: None,
1016                    size: None,
1017                    character_set: None,
1018                    with_local_time_zone: Some(false),
1019                    fraction: None,
1020                },
1021            },
1022        ];
1023
1024        let schema = ResultSet::build_schema(&columns).unwrap();
1025
1026        assert_eq!(schema.fields().len(), 3);
1027        assert_eq!(schema.field(0).name(), "id");
1028        assert_eq!(schema.field(1).name(), "name");
1029        assert_eq!(schema.field(2).name(), "created_at");
1030    }
1031
1032    #[test]
1033    fn test_query_metadata() {
1034        let schema = Arc::new(Schema::new(vec![
1035            Field::new("id", arrow::datatypes::DataType::Int64, false),
1036            Field::new("name", arrow::datatypes::DataType::Utf8, true),
1037        ]));
1038
1039        let metadata = QueryMetadata::new(Arc::clone(&schema), Some(100)).with_execution_time(250);
1040
1041        assert_eq!(metadata.column_count, 2);
1042        assert_eq!(metadata.total_rows, Some(100));
1043        assert_eq!(metadata.execution_time_ms, Some(250));
1044        assert_eq!(metadata.column_names(), vec!["id", "name"]);
1045    }
1046
1047    // =========================================================================
1048    // Tests for QueryMetadata::column_types
1049    // =========================================================================
1050
1051    #[test]
1052    fn test_query_metadata_column_types_returns_correct_types() {
1053        let schema = Arc::new(Schema::new(vec![
1054            Field::new("id", arrow::datatypes::DataType::Int64, false),
1055            Field::new("name", arrow::datatypes::DataType::Utf8, true),
1056            Field::new("active", arrow::datatypes::DataType::Boolean, true),
1057        ]));
1058
1059        let metadata = QueryMetadata::new(Arc::clone(&schema), Some(100));
1060        let types = metadata.column_types();
1061
1062        assert_eq!(types.len(), 3);
1063        assert_eq!(types[0], &arrow::datatypes::DataType::Int64);
1064        assert_eq!(types[1], &arrow::datatypes::DataType::Utf8);
1065        assert_eq!(types[2], &arrow::datatypes::DataType::Boolean);
1066    }
1067
1068    #[test]
1069    fn test_query_metadata_column_types_empty_schema() {
1070        let fields: Vec<Field> = vec![];
1071        let schema = Arc::new(Schema::new(fields));
1072
1073        let metadata = QueryMetadata::new(Arc::clone(&schema), None);
1074        let types = metadata.column_types();
1075
1076        assert!(types.is_empty());
1077    }
1078
1079    // =========================================================================
1080    // Tests for ResultSet::parse_date_to_days
1081    // =========================================================================
1082
1083    #[test]
1084    fn test_parse_date_to_days_unix_epoch() {
1085        let result = ResultSet::parse_date_to_days("1970-01-01").unwrap();
1086        assert_eq!(result, 0);
1087    }
1088
1089    #[test]
1090    fn test_parse_date_to_days_after_epoch() {
1091        let result = ResultSet::parse_date_to_days("1970-01-02").unwrap();
1092        assert_eq!(result, 1);
1093    }
1094
1095    #[test]
1096    fn test_parse_date_to_days_year_2000() {
1097        // 2000-01-01 is 10957 days after Unix epoch
1098        let result = ResultSet::parse_date_to_days("2000-01-01").unwrap();
1099        assert_eq!(result, 10957);
1100    }
1101
1102    #[test]
1103    fn test_parse_date_to_days_leap_year() {
1104        // 2000-03-01 should include Feb 29 (leap year)
1105        let result = ResultSet::parse_date_to_days("2000-03-01").unwrap();
1106        // 2000-01-01 = 10957, Jan has 31 days, Feb has 29 days (leap year)
1107        // 10957 + 31 + 29 = 11017
1108        assert_eq!(result, 11017);
1109    }
1110
1111    #[test]
1112    fn test_parse_date_to_days_non_leap_year() {
1113        // 2001-03-01 should NOT include Feb 29 (non-leap year)
1114        let result = ResultSet::parse_date_to_days("2001-03-01").unwrap();
1115        // 2001-01-01 = 11323, Jan has 31 days, Feb has 28 days
1116        // 11323 + 31 + 28 = 11382
1117        assert_eq!(result, 11382);
1118    }
1119
1120    #[test]
1121    fn test_parse_date_to_days_before_epoch() {
1122        // 1969-12-31 is -1 day before Unix epoch
1123        let result = ResultSet::parse_date_to_days("1969-12-31").unwrap();
1124        assert_eq!(result, -1);
1125    }
1126
1127    #[test]
1128    fn test_parse_date_to_days_invalid_format_wrong_separator() {
1129        let result = ResultSet::parse_date_to_days("2000/01/01");
1130        assert!(result.is_err());
1131    }
1132
1133    #[test]
1134    fn test_parse_date_to_days_invalid_format_missing_parts() {
1135        let result = ResultSet::parse_date_to_days("2000-01");
1136        assert!(result.is_err());
1137    }
1138
1139    #[test]
1140    fn test_parse_date_to_days_invalid_month_zero() {
1141        let result = ResultSet::parse_date_to_days("2000-00-01");
1142        assert!(result.is_err());
1143    }
1144
1145    #[test]
1146    fn test_parse_date_to_days_invalid_month_thirteen() {
1147        let result = ResultSet::parse_date_to_days("2000-13-01");
1148        assert!(result.is_err());
1149    }
1150
1151    #[test]
1152    fn test_parse_date_to_days_invalid_day_zero() {
1153        let result = ResultSet::parse_date_to_days("2000-01-00");
1154        assert!(result.is_err());
1155    }
1156
1157    #[test]
1158    fn test_parse_date_to_days_invalid_day_thirty_two() {
1159        let result = ResultSet::parse_date_to_days("2000-01-32");
1160        assert!(result.is_err());
1161    }
1162
1163    #[test]
1164    fn test_parse_date_to_days_invalid_non_numeric() {
1165        let result = ResultSet::parse_date_to_days("YYYY-MM-DD");
1166        assert!(result.is_err());
1167    }
1168
1169    // =========================================================================
1170    // Tests for ResultSet::parse_timestamp_to_micros
1171    // =========================================================================
1172
1173    #[test]
1174    fn test_parse_timestamp_to_micros_date_only() {
1175        let result = ResultSet::parse_timestamp_to_micros("1970-01-01").unwrap();
1176        assert_eq!(result, 0);
1177    }
1178
1179    #[test]
1180    fn test_parse_timestamp_to_micros_with_time() {
1181        // 1970-01-01 01:00:00 = 1 hour = 3600 * 1_000_000 microseconds
1182        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 01:00:00").unwrap();
1183        assert_eq!(result, 3600 * 1_000_000);
1184    }
1185
1186    #[test]
1187    fn test_parse_timestamp_to_micros_with_minutes() {
1188        // 1970-01-01 00:30:00 = 30 minutes = 30 * 60 * 1_000_000 microseconds
1189        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:30:00").unwrap();
1190        assert_eq!(result, 30 * 60 * 1_000_000);
1191    }
1192
1193    #[test]
1194    fn test_parse_timestamp_to_micros_with_seconds() {
1195        // 1970-01-01 00:00:45 = 45 seconds = 45 * 1_000_000 microseconds
1196        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:45").unwrap();
1197        assert_eq!(result, 45 * 1_000_000);
1198    }
1199
1200    #[test]
1201    fn test_parse_timestamp_to_micros_with_fractional_seconds_3_digits() {
1202        // 1970-01-01 00:00:00.123 = 123000 microseconds
1203        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.123").unwrap();
1204        assert_eq!(result, 123000);
1205    }
1206
1207    #[test]
1208    fn test_parse_timestamp_to_micros_with_fractional_seconds_6_digits() {
1209        // 1970-01-01 00:00:00.123456 = 123456 microseconds
1210        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.123456").unwrap();
1211        assert_eq!(result, 123456);
1212    }
1213
1214    #[test]
1215    fn test_parse_timestamp_to_micros_with_fractional_seconds_more_than_6_digits() {
1216        // 1970-01-01 00:00:00.1234567 should truncate to 123456 microseconds
1217        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.1234567").unwrap();
1218        assert_eq!(result, 123456);
1219    }
1220
1221    #[test]
1222    fn test_parse_timestamp_to_micros_with_fractional_seconds_1_digit() {
1223        // 1970-01-01 00:00:00.1 = 100000 microseconds
1224        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 00:00:00.1").unwrap();
1225        assert_eq!(result, 100000);
1226    }
1227
1228    #[test]
1229    fn test_parse_timestamp_to_micros_complex_timestamp() {
1230        // 2000-06-15 12:30:45.500
1231        // days = 11124 (from previous calculation for 2000-06-15)
1232        // time = 12*3600 + 30*60 + 45 seconds = 45045 seconds
1233        // micros from time = 45045 * 1_000_000 + 500000
1234        let result = ResultSet::parse_timestamp_to_micros("2000-06-15 12:30:45.500").unwrap();
1235
1236        // Calculate expected value
1237        let days_micros: i64 =
1238            ResultSet::parse_date_to_days("2000-06-15").unwrap() as i64 * 86400 * 1_000_000;
1239        let time_micros: i64 = (12 * 3600 + 30 * 60 + 45) * 1_000_000 + 500000;
1240        assert_eq!(result, days_micros + time_micros);
1241    }
1242
1243    #[test]
1244    fn test_parse_timestamp_to_micros_empty_string() {
1245        let result = ResultSet::parse_timestamp_to_micros("");
1246        assert!(result.is_err());
1247    }
1248
1249    #[test]
1250    fn test_parse_timestamp_to_micros_hours_and_minutes_only() {
1251        // 1970-01-01 01:30 (without seconds)
1252        // hours + minutes = 1*3600 + 30*60 = 5400 seconds = 5400 * 1_000_000 microseconds
1253        let result = ResultSet::parse_timestamp_to_micros("1970-01-01 01:30").unwrap();
1254        assert_eq!(result, 5400 * 1_000_000);
1255    }
1256
1257    // =========================================================================
1258    // Tests for ResultSet::parse_string_to_decimal
1259    // =========================================================================
1260
1261    #[test]
1262    fn test_parse_string_to_decimal_integer() {
1263        let result = ResultSet::parse_string_to_decimal("123", 2).unwrap();
1264        // 123 with scale 2 = 12300
1265        assert_eq!(result, 12300);
1266    }
1267
1268    #[test]
1269    fn test_parse_string_to_decimal_with_decimal_point() {
1270        let result = ResultSet::parse_string_to_decimal("123.45", 2).unwrap();
1271        // 123.45 with scale 2 = 12345
1272        assert_eq!(result, 12345);
1273    }
1274
1275    #[test]
1276    fn test_parse_string_to_decimal_negative() {
1277        let result = ResultSet::parse_string_to_decimal("-123.45", 2).unwrap();
1278        // -123.45 with scale 2 = -12345
1279        assert_eq!(result, -12345);
1280    }
1281
1282    #[test]
1283    fn test_parse_string_to_decimal_zero_scale() {
1284        let result = ResultSet::parse_string_to_decimal("123", 0).unwrap();
1285        assert_eq!(result, 123);
1286    }
1287
1288    #[test]
1289    fn test_parse_string_to_decimal_high_scale() {
1290        let result = ResultSet::parse_string_to_decimal("1.5", 6).unwrap();
1291        // 1.5 with scale 6 = 1500000
1292        assert_eq!(result, 1500000);
1293    }
1294
1295    #[test]
1296    fn test_parse_string_to_decimal_truncates_extra_decimals() {
1297        // When decimal part has more digits than scale
1298        let result = ResultSet::parse_string_to_decimal("1.123456", 3).unwrap();
1299        // 1.123 with scale 3 = 1123 (truncates extra digits)
1300        assert_eq!(result, 1123);
1301    }
1302
1303    #[test]
1304    fn test_parse_string_to_decimal_zero() {
1305        let result = ResultSet::parse_string_to_decimal("0", 2).unwrap();
1306        assert_eq!(result, 0);
1307    }
1308
1309    #[test]
1310    fn test_parse_string_to_decimal_negative_zero() {
1311        let result = ResultSet::parse_string_to_decimal("-0", 2).unwrap();
1312        assert_eq!(result, 0);
1313    }
1314
1315    #[test]
1316    fn test_parse_string_to_decimal_empty_string() {
1317        let result = ResultSet::parse_string_to_decimal("", 2);
1318        assert!(result.is_err());
1319    }
1320
1321    #[test]
1322    fn test_parse_string_to_decimal_multiple_decimal_points() {
1323        let result = ResultSet::parse_string_to_decimal("1.2.3", 2);
1324        assert!(result.is_err());
1325    }
1326
1327    #[test]
1328    fn test_parse_string_to_decimal_non_numeric() {
1329        let result = ResultSet::parse_string_to_decimal("abc", 2);
1330        assert!(result.is_err());
1331    }
1332
1333    #[test]
1334    fn test_parse_string_to_decimal_large_value() {
1335        let result = ResultSet::parse_string_to_decimal("999999999999999999", 0).unwrap();
1336        assert_eq!(result, 999999999999999999_i128);
1337    }
1338
1339    // =========================================================================
1340    // Tests for ResultSet::exasol_datatype_to_arrow
1341    // =========================================================================
1342
1343    #[test]
1344    fn test_exasol_datatype_to_arrow_boolean() {
1345        let data_type = DataType {
1346            type_name: "BOOLEAN".to_string(),
1347            precision: None,
1348            scale: None,
1349            size: None,
1350            character_set: None,
1351            with_local_time_zone: None,
1352            fraction: None,
1353        };
1354
1355        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1356        assert_eq!(result, arrow::datatypes::DataType::Boolean);
1357    }
1358
1359    #[test]
1360    fn test_exasol_datatype_to_arrow_char() {
1361        let data_type = DataType {
1362            type_name: "CHAR".to_string(),
1363            precision: None,
1364            scale: None,
1365            size: Some(10),
1366            character_set: None,
1367            with_local_time_zone: None,
1368            fraction: None,
1369        };
1370
1371        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1372        assert_eq!(result, arrow::datatypes::DataType::Utf8);
1373    }
1374
1375    #[test]
1376    fn test_exasol_datatype_to_arrow_char_default_size() {
1377        let data_type = DataType {
1378            type_name: "CHAR".to_string(),
1379            precision: None,
1380            scale: None,
1381            size: None, // Should default to 1
1382            character_set: None,
1383            with_local_time_zone: None,
1384            fraction: None,
1385        };
1386
1387        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1388        assert_eq!(result, arrow::datatypes::DataType::Utf8);
1389    }
1390
1391    #[test]
1392    fn test_exasol_datatype_to_arrow_varchar() {
1393        let data_type = DataType {
1394            type_name: "VARCHAR".to_string(),
1395            precision: None,
1396            scale: None,
1397            size: Some(100),
1398            character_set: None,
1399            with_local_time_zone: None,
1400            fraction: None,
1401        };
1402
1403        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1404        assert_eq!(result, arrow::datatypes::DataType::Utf8);
1405    }
1406
1407    #[test]
1408    fn test_exasol_datatype_to_arrow_varchar_default_size() {
1409        let data_type = DataType {
1410            type_name: "VARCHAR".to_string(),
1411            precision: None,
1412            scale: None,
1413            size: None, // Should default to 2000000
1414            character_set: None,
1415            with_local_time_zone: None,
1416            fraction: None,
1417        };
1418
1419        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1420        assert_eq!(result, arrow::datatypes::DataType::Utf8);
1421    }
1422
1423    #[test]
1424    fn test_exasol_datatype_to_arrow_decimal() {
1425        let data_type = DataType {
1426            type_name: "DECIMAL".to_string(),
1427            precision: Some(18),
1428            scale: Some(2),
1429            size: None,
1430            character_set: None,
1431            with_local_time_zone: None,
1432            fraction: None,
1433        };
1434
1435        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1436        assert_eq!(result, arrow::datatypes::DataType::Decimal128(18, 2));
1437    }
1438
1439    #[test]
1440    fn test_exasol_datatype_to_arrow_decimal_default_precision_scale() {
1441        let data_type = DataType {
1442            type_name: "DECIMAL".to_string(),
1443            precision: None, // Should default to 18
1444            scale: None,     // Should default to 0
1445            size: None,
1446            character_set: None,
1447            with_local_time_zone: None,
1448            fraction: None,
1449        };
1450
1451        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1452        assert_eq!(result, arrow::datatypes::DataType::Decimal128(18, 0));
1453    }
1454
1455    #[test]
1456    fn test_exasol_datatype_to_arrow_double() {
1457        let data_type = DataType {
1458            type_name: "DOUBLE".to_string(),
1459            precision: None,
1460            scale: None,
1461            size: None,
1462            character_set: None,
1463            with_local_time_zone: None,
1464            fraction: None,
1465        };
1466
1467        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1468        assert_eq!(result, arrow::datatypes::DataType::Float64);
1469    }
1470
1471    #[test]
1472    fn test_exasol_datatype_to_arrow_date() {
1473        let data_type = DataType {
1474            type_name: "DATE".to_string(),
1475            precision: None,
1476            scale: None,
1477            size: None,
1478            character_set: None,
1479            with_local_time_zone: None,
1480            fraction: None,
1481        };
1482
1483        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1484        assert_eq!(result, arrow::datatypes::DataType::Date32);
1485    }
1486
1487    #[test]
1488    fn test_exasol_datatype_to_arrow_timestamp_without_tz() {
1489        let data_type = DataType {
1490            type_name: "TIMESTAMP".to_string(),
1491            precision: None,
1492            scale: None,
1493            size: None,
1494            character_set: None,
1495            with_local_time_zone: Some(false),
1496            fraction: None,
1497        };
1498
1499        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1500        assert!(matches!(
1501            result,
1502            arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None)
1503        ));
1504    }
1505
1506    #[test]
1507    fn test_exasol_datatype_to_arrow_timestamp_with_tz() {
1508        let data_type = DataType {
1509            type_name: "TIMESTAMP".to_string(),
1510            precision: None,
1511            scale: None,
1512            size: None,
1513            character_set: None,
1514            with_local_time_zone: Some(true),
1515            fraction: None,
1516        };
1517
1518        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1519        assert!(matches!(
1520            result,
1521            arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, Some(_))
1522        ));
1523    }
1524
1525    #[test]
1526    fn test_exasol_datatype_to_arrow_timestamp_default_tz() {
1527        let data_type = DataType {
1528            type_name: "TIMESTAMP".to_string(),
1529            precision: None,
1530            scale: None,
1531            size: None,
1532            character_set: None,
1533            with_local_time_zone: None, // Should default to false
1534            fraction: None,
1535        };
1536
1537        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1538        assert!(matches!(
1539            result,
1540            arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None)
1541        ));
1542    }
1543
1544    #[test]
1545    fn test_exasol_datatype_to_arrow_interval_year_to_month() {
1546        let data_type = DataType {
1547            type_name: "INTERVAL YEAR TO MONTH".to_string(),
1548            precision: None,
1549            scale: None,
1550            size: None,
1551            character_set: None,
1552            with_local_time_zone: None,
1553            fraction: None,
1554        };
1555
1556        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1557        assert!(matches!(
1558            result,
1559            arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1560        ));
1561    }
1562
1563    #[test]
1564    fn test_exasol_datatype_to_arrow_interval_day_to_second() {
1565        let data_type = DataType {
1566            type_name: "INTERVAL DAY TO SECOND".to_string(),
1567            precision: None,
1568            scale: None,
1569            size: None,
1570            character_set: None,
1571            with_local_time_zone: None,
1572            fraction: Some(6),
1573        };
1574
1575        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1576        assert!(matches!(
1577            result,
1578            arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1579        ));
1580    }
1581
1582    #[test]
1583    fn test_exasol_datatype_to_arrow_interval_day_to_second_default_fraction() {
1584        let data_type = DataType {
1585            type_name: "INTERVAL DAY TO SECOND".to_string(),
1586            precision: None,
1587            scale: None,
1588            size: None,
1589            character_set: None,
1590            with_local_time_zone: None,
1591            fraction: None, // Should default to 3
1592        };
1593
1594        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1595        assert!(matches!(
1596            result,
1597            arrow::datatypes::DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano)
1598        ));
1599    }
1600
1601    #[test]
1602    fn test_exasol_datatype_to_arrow_geometry() {
1603        let data_type = DataType {
1604            type_name: "GEOMETRY".to_string(),
1605            precision: None,
1606            scale: None,
1607            size: None,
1608            character_set: None,
1609            with_local_time_zone: None,
1610            fraction: None,
1611        };
1612
1613        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1614        assert_eq!(result, arrow::datatypes::DataType::Binary);
1615    }
1616
1617    #[test]
1618    fn test_exasol_datatype_to_arrow_hashtype() {
1619        let data_type = DataType {
1620            type_name: "HASHTYPE".to_string(),
1621            precision: None,
1622            scale: None,
1623            size: None,
1624            character_set: None,
1625            with_local_time_zone: None,
1626            fraction: None,
1627        };
1628
1629        let result = ResultSet::exasol_datatype_to_arrow(&data_type).unwrap();
1630        assert_eq!(result, arrow::datatypes::DataType::Binary);
1631    }
1632
1633    #[test]
1634    fn test_exasol_datatype_to_arrow_unsupported_type() {
1635        let data_type = DataType {
1636            type_name: "UNKNOWN_TYPE".to_string(),
1637            precision: None,
1638            scale: None,
1639            size: None,
1640            character_set: None,
1641            with_local_time_zone: None,
1642            fraction: None,
1643        };
1644
1645        let result = ResultSet::exasol_datatype_to_arrow(&data_type);
1646        assert!(result.is_err());
1647    }
1648
1649    // =========================================================================
1650    // Tests for ResultSet::column_major_to_record_batch with various types
1651    // =========================================================================
1652
1653    #[tokio::test]
1654    async fn test_column_major_to_record_batch_with_int32() {
1655        let schema = Arc::new(Schema::new(vec![Field::new(
1656            "value",
1657            arrow::datatypes::DataType::Int32,
1658            true,
1659        )]));
1660
1661        let data = ResultData {
1662            columns: vec![],
1663            data: vec![
1664                vec![serde_json::json!(1)],
1665                vec![serde_json::json!(2)],
1666                vec![serde_json::json!(null)],
1667                vec![serde_json::json!(4)],
1668            ],
1669            total_rows: 4,
1670        };
1671
1672        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1673
1674        assert_eq!(batch.num_rows(), 4);
1675        assert_eq!(batch.num_columns(), 1);
1676
1677        let array = batch
1678            .column(0)
1679            .as_any()
1680            .downcast_ref::<arrow::array::Int32Array>()
1681            .unwrap();
1682        assert_eq!(array.value(0), 1);
1683        assert_eq!(array.value(1), 2);
1684        assert!(array.is_null(2));
1685        assert_eq!(array.value(3), 4);
1686    }
1687
1688    #[tokio::test]
1689    async fn test_column_major_to_record_batch_with_int64() {
1690        let schema = Arc::new(Schema::new(vec![Field::new(
1691            "value",
1692            arrow::datatypes::DataType::Int64,
1693            true,
1694        )]));
1695
1696        let data = ResultData {
1697            columns: vec![],
1698            data: vec![
1699                vec![serde_json::json!(9223372036854775807_i64)], // Max i64
1700                vec![serde_json::json!(-9223372036854775808_i64)], // Min i64
1701                vec![serde_json::json!(null)],
1702            ],
1703            total_rows: 3,
1704        };
1705
1706        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1707
1708        assert_eq!(batch.num_rows(), 3);
1709
1710        let array = batch
1711            .column(0)
1712            .as_any()
1713            .downcast_ref::<arrow::array::Int64Array>()
1714            .unwrap();
1715        assert_eq!(array.value(0), 9223372036854775807_i64);
1716        assert_eq!(array.value(1), -9223372036854775808_i64);
1717        assert!(array.is_null(2));
1718    }
1719
1720    #[tokio::test]
1721    async fn test_column_major_to_record_batch_with_float64() {
1722        let schema = Arc::new(Schema::new(vec![Field::new(
1723            "value",
1724            arrow::datatypes::DataType::Float64,
1725            true,
1726        )]));
1727
1728        let data = ResultData {
1729            columns: vec![],
1730            data: vec![
1731                vec![serde_json::json!(1.23456)],
1732                vec![serde_json::json!(-9.87654)],
1733                vec![serde_json::json!(null)],
1734                vec![serde_json::json!(0.0)],
1735            ],
1736            total_rows: 4,
1737        };
1738
1739        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1740
1741        assert_eq!(batch.num_rows(), 4);
1742
1743        let array = batch
1744            .column(0)
1745            .as_any()
1746            .downcast_ref::<arrow::array::Float64Array>()
1747            .unwrap();
1748        assert!((array.value(0) - 1.23456).abs() < 0.00001);
1749        assert!((array.value(1) - (-9.87654)).abs() < 0.00001);
1750        assert!(array.is_null(2));
1751        assert!((array.value(3) - 0.0).abs() < 0.00001);
1752    }
1753
1754    #[tokio::test]
1755    async fn test_column_major_to_record_batch_with_boolean() {
1756        let schema = Arc::new(Schema::new(vec![Field::new(
1757            "flag",
1758            arrow::datatypes::DataType::Boolean,
1759            true,
1760        )]));
1761
1762        let data = ResultData {
1763            columns: vec![],
1764            data: vec![
1765                vec![serde_json::json!(true)],
1766                vec![serde_json::json!(false)],
1767                vec![serde_json::json!(null)],
1768            ],
1769            total_rows: 3,
1770        };
1771
1772        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1773
1774        assert_eq!(batch.num_rows(), 3);
1775
1776        let array = batch
1777            .column(0)
1778            .as_any()
1779            .downcast_ref::<arrow::array::BooleanArray>()
1780            .unwrap();
1781        assert!(array.value(0));
1782        assert!(!array.value(1));
1783        assert!(array.is_null(2));
1784    }
1785
1786    #[tokio::test]
1787    async fn test_column_major_to_record_batch_with_date32() {
1788        let schema = Arc::new(Schema::new(vec![Field::new(
1789            "date",
1790            arrow::datatypes::DataType::Date32,
1791            true,
1792        )]));
1793
1794        let data = ResultData {
1795            columns: vec![],
1796            data: vec![
1797                vec![serde_json::json!("1970-01-01")],
1798                vec![serde_json::json!("2000-01-01")],
1799                vec![serde_json::json!(null)],
1800            ],
1801            total_rows: 3,
1802        };
1803
1804        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1805
1806        assert_eq!(batch.num_rows(), 3);
1807
1808        let array = batch
1809            .column(0)
1810            .as_any()
1811            .downcast_ref::<arrow::array::Date32Array>()
1812            .unwrap();
1813        assert_eq!(array.value(0), 0); // Unix epoch
1814        assert_eq!(array.value(1), 10957); // 2000-01-01
1815        assert!(array.is_null(2));
1816    }
1817
1818    #[tokio::test]
1819    async fn test_column_major_to_record_batch_with_timestamp() {
1820        let schema = Arc::new(Schema::new(vec![Field::new(
1821            "timestamp",
1822            arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1823            true,
1824        )]));
1825
1826        let data = ResultData {
1827            columns: vec![],
1828            data: vec![
1829                vec![serde_json::json!("1970-01-01 00:00:00")],
1830                vec![serde_json::json!("1970-01-01 01:00:00.123456")],
1831                vec![serde_json::json!(null)],
1832            ],
1833            total_rows: 3,
1834        };
1835
1836        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1837
1838        assert_eq!(batch.num_rows(), 3);
1839
1840        let array = batch
1841            .column(0)
1842            .as_any()
1843            .downcast_ref::<arrow::array::TimestampMicrosecondArray>()
1844            .unwrap();
1845        assert_eq!(array.value(0), 0);
1846        // 1 hour + 123456 microseconds
1847        assert_eq!(array.value(1), 3600 * 1_000_000 + 123456);
1848        assert!(array.is_null(2));
1849    }
1850
1851    #[tokio::test]
1852    async fn test_column_major_to_record_batch_empty_data() {
1853        let schema = Arc::new(Schema::new(vec![
1854            Field::new("id", arrow::datatypes::DataType::Int64, true),
1855            Field::new("name", arrow::datatypes::DataType::Utf8, true),
1856        ]));
1857
1858        let data = ResultData {
1859            columns: vec![],
1860            data: vec![],
1861            total_rows: 0,
1862        };
1863
1864        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1865
1866        assert_eq!(batch.num_rows(), 0);
1867        assert_eq!(batch.num_columns(), 2);
1868    }
1869
1870    #[tokio::test]
1871    async fn test_column_major_to_record_batch_with_utf8_string() {
1872        let schema = Arc::new(Schema::new(vec![Field::new(
1873            "text",
1874            arrow::datatypes::DataType::Utf8,
1875            true,
1876        )]));
1877
1878        let data = ResultData {
1879            columns: vec![],
1880            data: vec![
1881                vec![serde_json::json!("hello")],
1882                vec![serde_json::json!("world")],
1883                vec![serde_json::json!(null)],
1884                vec![serde_json::json!("")], // Empty string
1885            ],
1886            total_rows: 4,
1887        };
1888
1889        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1890
1891        assert_eq!(batch.num_rows(), 4);
1892
1893        let array = batch
1894            .column(0)
1895            .as_any()
1896            .downcast_ref::<arrow::array::StringArray>()
1897            .unwrap();
1898        assert_eq!(array.value(0), "hello");
1899        assert_eq!(array.value(1), "world");
1900        assert!(array.is_null(2));
1901        assert_eq!(array.value(3), "");
1902    }
1903
1904    #[tokio::test]
1905    async fn test_column_major_to_record_batch_utf8_from_non_string_value() {
1906        // Test that non-string JSON values are converted to string
1907        let schema = Arc::new(Schema::new(vec![Field::new(
1908            "text",
1909            arrow::datatypes::DataType::Utf8,
1910            true,
1911        )]));
1912
1913        let data = ResultData {
1914            columns: vec![],
1915            data: vec![
1916                vec![serde_json::json!(123)], // Number should be converted to "123"
1917            ],
1918            total_rows: 1,
1919        };
1920
1921        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1922
1923        let array = batch
1924            .column(0)
1925            .as_any()
1926            .downcast_ref::<arrow::array::StringArray>()
1927            .unwrap();
1928        assert_eq!(array.value(0), "123");
1929    }
1930
1931    #[tokio::test]
1932    async fn test_column_major_to_record_batch_decimal_from_string() {
1933        let schema = Arc::new(Schema::new(vec![Field::new(
1934            "amount",
1935            arrow::datatypes::DataType::Decimal128(18, 2),
1936            true,
1937        )]));
1938
1939        let data = ResultData {
1940            columns: vec![],
1941            data: vec![
1942                vec![serde_json::json!("123.45")],
1943                vec![serde_json::json!("-67.89")],
1944                vec![serde_json::json!(null)],
1945            ],
1946            total_rows: 3,
1947        };
1948
1949        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1950
1951        assert_eq!(batch.num_rows(), 3);
1952
1953        let array = batch
1954            .column(0)
1955            .as_any()
1956            .downcast_ref::<arrow::array::Decimal128Array>()
1957            .unwrap();
1958        assert_eq!(array.value(0), 12345); // 123.45 with scale 2
1959        assert_eq!(array.value(1), -6789); // -67.89 with scale 2
1960        assert!(array.is_null(2));
1961    }
1962
1963    #[tokio::test]
1964    async fn test_column_major_to_record_batch_decimal_from_integer() {
1965        let schema = Arc::new(Schema::new(vec![Field::new(
1966            "amount",
1967            arrow::datatypes::DataType::Decimal128(18, 2),
1968            true,
1969        )]));
1970
1971        let data = ResultData {
1972            columns: vec![],
1973            data: vec![
1974                vec![serde_json::json!(100)], // Integer should be scaled
1975            ],
1976            total_rows: 1,
1977        };
1978
1979        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
1980
1981        let array = batch
1982            .column(0)
1983            .as_any()
1984            .downcast_ref::<arrow::array::Decimal128Array>()
1985            .unwrap();
1986        assert_eq!(array.value(0), 10000); // 100 with scale 2 = 10000
1987    }
1988
1989    #[tokio::test]
1990    async fn test_column_major_to_record_batch_decimal_from_float() {
1991        let schema = Arc::new(Schema::new(vec![Field::new(
1992            "amount",
1993            arrow::datatypes::DataType::Decimal128(18, 2),
1994            true,
1995        )]));
1996
1997        let data = ResultData {
1998            columns: vec![],
1999            data: vec![
2000                vec![serde_json::json!(99.99)], // Float should be scaled
2001            ],
2002            total_rows: 1,
2003        };
2004
2005        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2006
2007        let array = batch
2008            .column(0)
2009            .as_any()
2010            .downcast_ref::<arrow::array::Decimal128Array>()
2011            .unwrap();
2012        assert_eq!(array.value(0), 9999); // 99.99 with scale 2 = 9999
2013    }
2014
2015    #[tokio::test]
2016    async fn test_column_major_to_record_batch_invalid_date_becomes_null() {
2017        let schema = Arc::new(Schema::new(vec![Field::new(
2018            "date",
2019            arrow::datatypes::DataType::Date32,
2020            true,
2021        )]));
2022
2023        let data = ResultData {
2024            columns: vec![],
2025            data: vec![
2026                vec![serde_json::json!("invalid-date")],
2027                vec![serde_json::json!(123)], // Non-string should become null
2028            ],
2029            total_rows: 2,
2030        };
2031
2032        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2033
2034        let array = batch
2035            .column(0)
2036            .as_any()
2037            .downcast_ref::<arrow::array::Date32Array>()
2038            .unwrap();
2039        // Invalid date formats become null
2040        assert!(array.is_null(0));
2041        assert!(array.is_null(1));
2042    }
2043
2044    #[tokio::test]
2045    async fn test_column_major_to_record_batch_invalid_timestamp_becomes_null() {
2046        let schema = Arc::new(Schema::new(vec![Field::new(
2047            "timestamp",
2048            arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
2049            true,
2050        )]));
2051
2052        let data = ResultData {
2053            columns: vec![],
2054            data: vec![
2055                vec![serde_json::json!("not-a-timestamp")],
2056                vec![serde_json::json!(12345)], // Non-string should become null
2057            ],
2058            total_rows: 2,
2059        };
2060
2061        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2062
2063        let array = batch
2064            .column(0)
2065            .as_any()
2066            .downcast_ref::<arrow::array::TimestampMicrosecondArray>()
2067            .unwrap();
2068        assert!(array.is_null(0));
2069        assert!(array.is_null(1));
2070    }
2071
2072    #[tokio::test]
2073    async fn test_column_major_to_record_batch_boolean_null_from_non_bool() {
2074        let schema = Arc::new(Schema::new(vec![Field::new(
2075            "flag",
2076            arrow::datatypes::DataType::Boolean,
2077            true,
2078        )]));
2079
2080        let data = ResultData {
2081            columns: vec![],
2082            data: vec![
2083                vec![serde_json::json!("not-a-bool")], // String is not a bool
2084                vec![serde_json::json!(123)],          // Number is not a bool
2085            ],
2086            total_rows: 2,
2087        };
2088
2089        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2090
2091        let array = batch
2092            .column(0)
2093            .as_any()
2094            .downcast_ref::<arrow::array::BooleanArray>()
2095            .unwrap();
2096        // Non-bool values become null
2097        assert!(array.is_null(0));
2098        assert!(array.is_null(1));
2099    }
2100
2101    #[tokio::test]
2102    async fn test_column_major_to_record_batch_int32_null_from_non_int() {
2103        let schema = Arc::new(Schema::new(vec![Field::new(
2104            "value",
2105            arrow::datatypes::DataType::Int32,
2106            true,
2107        )]));
2108
2109        let data = ResultData {
2110            columns: vec![],
2111            data: vec![
2112                vec![serde_json::json!("not-an-int")], // String is not parseable as int
2113            ],
2114            total_rows: 1,
2115        };
2116
2117        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2118
2119        let array = batch
2120            .column(0)
2121            .as_any()
2122            .downcast_ref::<arrow::array::Int32Array>()
2123            .unwrap();
2124        assert!(array.is_null(0));
2125    }
2126
2127    #[tokio::test]
2128    async fn test_column_major_to_record_batch_float64_null_from_non_float() {
2129        let schema = Arc::new(Schema::new(vec![Field::new(
2130            "value",
2131            arrow::datatypes::DataType::Float64,
2132            true,
2133        )]));
2134
2135        let data = ResultData {
2136            columns: vec![],
2137            data: vec![
2138                vec![serde_json::json!("not-a-float")], // String is not parseable as float
2139            ],
2140            total_rows: 1,
2141        };
2142
2143        let batch = ResultSet::column_major_to_record_batch(&data, &schema).unwrap();
2144
2145        let array = batch
2146            .column(0)
2147            .as_any()
2148            .downcast_ref::<arrow::array::Float64Array>()
2149            .unwrap();
2150        assert!(array.is_null(0));
2151    }
2152
2153    #[tokio::test]
2154    async fn test_column_major_to_record_batch_unsupported_type_returns_error() {
2155        // Test that unsupported types (like LargeUtf8) result in an ArrowError
2156        // because the fallback creates a StringArray which doesn't match the schema
2157        let schema = Arc::new(Schema::new(vec![Field::new(
2158            "large_text",
2159            arrow::datatypes::DataType::LargeUtf8,
2160            true,
2161        )]));
2162
2163        let data = ResultData {
2164            columns: vec![],
2165            data: vec![vec![serde_json::json!("test_data")]],
2166            total_rows: 1,
2167        };
2168
2169        let result = ResultSet::column_major_to_record_batch(&data, &schema);
2170
2171        // Unsupported types cause an error because the fallback creates a StringArray
2172        // which doesn't match the expected LargeUtf8 type in the schema
2173        assert!(result.is_err());
2174    }
2175
2176    // =========================================================================
2177    // Tests for ResultSetIterator::metadata
2178    // =========================================================================
2179
2180    #[tokio::test]
2181    async fn test_result_set_iterator_metadata() {
2182        let mock_transport = MockTransport::new();
2183        let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
2184
2185        let data = ResultData {
2186            columns: vec![ColumnInfo {
2187                name: "id".to_string(),
2188                data_type: DataType {
2189                    type_name: "DECIMAL".to_string(),
2190                    precision: Some(18),
2191                    scale: Some(0),
2192                    size: None,
2193                    character_set: None,
2194                    with_local_time_zone: None,
2195                    fraction: None,
2196                },
2197            }],
2198            data: vec![vec![serde_json::json!(1)]],
2199            total_rows: 1,
2200        };
2201
2202        let result = TransportQueryResult::ResultSet {
2203            handle: Some(ResultSetHandle::new(1)),
2204            data,
2205        };
2206
2207        let result_set = ResultSet::from_transport_result(result, transport).unwrap();
2208        let iterator = result_set.into_iterator().unwrap();
2209
2210        let metadata = iterator.metadata();
2211        assert_eq!(metadata.column_count, 1);
2212        assert_eq!(metadata.total_rows, Some(1));
2213        assert_eq!(metadata.column_names(), vec!["id"]);
2214    }
2215
2216    #[tokio::test]
2217    async fn test_result_set_iterator_metadata_multiple_columns() {
2218        let mock_transport = MockTransport::new();
2219        let transport: Arc<Mutex<dyn TransportProtocol>> = Arc::new(Mutex::new(mock_transport));
2220
2221        let data = ResultData {
2222            columns: vec![
2223                ColumnInfo {
2224                    name: "id".to_string(),
2225                    data_type: DataType {
2226                        type_name: "DECIMAL".to_string(),
2227                        precision: Some(18),
2228                        scale: Some(0),
2229                        size: None,
2230                        character_set: None,
2231                        with_local_time_zone: None,
2232                        fraction: None,
2233                    },
2234                },
2235                ColumnInfo {
2236                    name: "name".to_string(),
2237                    data_type: DataType {
2238                        type_name: "VARCHAR".to_string(),
2239                        precision: None,
2240                        scale: None,
2241                        size: Some(100),
2242                        character_set: Some("UTF8".to_string()),
2243                        with_local_time_zone: None,
2244                        fraction: None,
2245                    },
2246                },
2247                ColumnInfo {
2248                    name: "active".to_string(),
2249                    data_type: DataType {
2250                        type_name: "BOOLEAN".to_string(),
2251                        precision: None,
2252                        scale: None,
2253                        size: None,
2254                        character_set: None,
2255                        with_local_time_zone: None,
2256                        fraction: None,
2257                    },
2258                },
2259            ],
2260            data: vec![vec![
2261                serde_json::json!(1),
2262                serde_json::json!("Alice"),
2263                serde_json::json!(true),
2264            ]],
2265            total_rows: 1,
2266        };
2267
2268        let result = TransportQueryResult::ResultSet {
2269            handle: Some(ResultSetHandle::new(1)),
2270            data,
2271        };
2272
2273        let result_set = ResultSet::from_transport_result(result, transport).unwrap();
2274        let iterator = result_set.into_iterator().unwrap();
2275
2276        let metadata = iterator.metadata();
2277        assert_eq!(metadata.column_count, 3);
2278        assert_eq!(metadata.column_names(), vec!["id", "name", "active"]);
2279
2280        let types = metadata.column_types();
2281        assert_eq!(types.len(), 3);
2282        assert!(matches!(
2283            types[0],
2284            arrow::datatypes::DataType::Decimal128(18, 0)
2285        ));
2286        assert!(matches!(types[1], arrow::datatypes::DataType::Utf8));
2287        assert!(matches!(types[2], arrow::datatypes::DataType::Boolean));
2288    }
2289}