Skip to main content

hdbconnect_arrow/conversion/
processor.rs

1//! Batch processor for streaming conversion of HANA rows to `RecordBatch`es.
2//!
3//! Implements buffered batch creation with configurable batch size.
4
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit};
9
10use crate::Result;
11use crate::builders::dispatch::{BuilderEnum, BuilderKind};
12use crate::builders::factory::BuilderFactory;
13use crate::traits::builder::HanaCompatibleBuilder;
14use crate::traits::row::RowLike;
15use crate::traits::streaming::BatchConfig;
16
17/// Append a HANA value to a builder, handling NULL values.
18#[inline]
19fn append_value_to_builder<B: HanaCompatibleBuilder>(
20    builder: &mut B,
21    value: &hdbconnect::HdbValue,
22) -> Result<()> {
23    match value {
24        hdbconnect::HdbValue::NULL => {
25            builder.append_null();
26            Ok(())
27        }
28        v => builder.append_hana_value(v),
29    }
30}
31
32/// Macro to generate specialized homogeneous processing loops for inline variants.
33///
34/// Eliminates per-value enum dispatch by extracting concrete builder type per column.
35macro_rules! specialize_homogeneous_loop {
36    ($self:expr, $row:expr, $variant:ident) => {{
37        for (i, builder) in $self.builders.iter_mut().enumerate() {
38            let BuilderEnum::$variant(concrete_builder) = builder else {
39                unreachable!("SchemaProfile guarantees homogeneous type")
40            };
41            append_value_to_builder(concrete_builder, &$row.get(i))?;
42        }
43        Ok(())
44    }};
45}
46
47/// Macro for boxed variants that require explicit deref.
48macro_rules! specialize_homogeneous_loop_boxed {
49    ($self:expr, $row:expr, $variant:ident) => {{
50        for (i, builder) in $self.builders.iter_mut().enumerate() {
51            let BuilderEnum::$variant(boxed) = builder else {
52                unreachable!("SchemaProfile guarantees homogeneous type")
53            };
54            append_value_to_builder(boxed.as_mut(), &$row.get(i))?;
55        }
56        Ok(())
57    }};
58}
59
60/// Schema profile for processor optimization.
61///
62/// Classifies schemas as homogeneous (all columns same type) or mixed
63/// to enable specialized processing paths.
64#[derive(Debug, Clone)]
65pub enum SchemaProfile {
66    /// All columns share the same type.
67    Homogeneous {
68        /// Number of columns in the schema.
69        column_count: usize,
70        /// The common builder kind for all columns.
71        kind: BuilderKind,
72    },
73    /// Columns have different types.
74    Mixed,
75}
76
77impl SchemaProfile {
78    /// Analyze schema and return its profile.
79    #[must_use]
80    pub fn analyze(schema: &Schema) -> Self {
81        let fields = schema.fields();
82        if fields.is_empty() {
83            return Self::Mixed;
84        }
85
86        let first_type = fields[0].data_type();
87        let all_same = fields
88            .iter()
89            .skip(1)
90            .all(|f| Self::types_equivalent(first_type, f.data_type()));
91
92        // Verify discriminant comparison works correctly for homogeneous schemas
93        debug_assert!(
94            !all_same
95                || fields.iter().all(|f| {
96                    let kind1 = Self::data_type_to_kind(first_type);
97                    let kind2 = Self::data_type_to_kind(f.data_type());
98                    kind1 == kind2
99                }),
100            "Discriminant-equivalent types must map to same BuilderKind"
101        );
102
103        if all_same {
104            Self::Homogeneous {
105                column_count: fields.len(),
106                kind: Self::data_type_to_kind(first_type),
107            }
108        } else {
109            Self::Mixed
110        }
111    }
112
113    /// Check if two `DataTypes` are equivalent for homogeneous schema detection.
114    ///
115    /// Uses discriminant comparison, which treats types as equivalent if they
116    /// have the same variant, ignoring associated data. For example:
117    /// - `Decimal128(18, 2)` is equivalent to `Decimal128(10, 4)`
118    /// - `Timestamp(Nanosecond, None)` is equivalent to `Timestamp(Nanosecond, Some(tz))`
119    ///
120    /// This design choice enables homogeneous optimization for schemas where
121    /// all columns use the same Arrow type variant, even if parameters differ.
122    fn types_equivalent(a: &DataType, b: &DataType) -> bool {
123        std::mem::discriminant(a) == std::mem::discriminant(b)
124    }
125
126    #[allow(clippy::match_same_arms, clippy::missing_const_for_fn)]
127    fn data_type_to_kind(dt: &DataType) -> BuilderKind {
128        match dt {
129            DataType::UInt8 => BuilderKind::UInt8,
130            DataType::Int16 => BuilderKind::Int16,
131            DataType::Int32 => BuilderKind::Int32,
132            DataType::Int64 => BuilderKind::Int64,
133            DataType::Float32 => BuilderKind::Float32,
134            DataType::Float64 => BuilderKind::Float64,
135            DataType::Decimal128(_, _) => BuilderKind::Decimal128,
136            DataType::Boolean => BuilderKind::Boolean,
137            DataType::Utf8 => BuilderKind::Utf8,
138            DataType::LargeUtf8 => BuilderKind::LargeUtf8,
139            DataType::Binary => BuilderKind::Binary,
140            DataType::LargeBinary => BuilderKind::LargeBinary,
141            DataType::FixedSizeBinary(_) => BuilderKind::FixedSizeBinary,
142            DataType::Date32 => BuilderKind::Date32,
143            DataType::Time64(TimeUnit::Nanosecond) => BuilderKind::Time64Nanosecond,
144            DataType::Timestamp(TimeUnit::Nanosecond, None) => BuilderKind::TimestampNanosecond,
145            _ => BuilderKind::Utf8,
146        }
147    }
148
149    /// Returns true if the schema is homogeneous.
150    #[must_use]
151    pub const fn is_homogeneous(&self) -> bool {
152        matches!(self, Self::Homogeneous { .. })
153    }
154}
155
156/// Processor that converts HANA rows into Arrow `RecordBatch`es.
157///
158/// Buffers rows until `batch_size` is reached, then emits a `RecordBatch`.
159/// Uses enum-based dispatch to eliminate vtable overhead.
160///
161/// # Example
162///
163/// ```rust,ignore
164/// use hdbconnect_arrow::conversion::HanaBatchProcessor;
165/// use hdbconnect_arrow::traits::streaming::BatchConfig;
166///
167/// let schema = /* Arrow schema */;
168/// let config = BatchConfig::with_batch_size(10000);
169/// let mut processor = HanaBatchProcessor::new(Arc::new(schema), config);
170///
171/// for row in result_set {
172///     if let Some(batch) = processor.process_row(row)? {
173///         // Process batch
174///     }
175/// }
176///
177/// // Don't forget to flush remaining rows
178/// if let Some(batch) = processor.flush()? {
179///     // Process final batch
180/// }
181/// ```
182pub struct HanaBatchProcessor {
183    schema: SchemaRef,
184    config: BatchConfig,
185    builders: Vec<BuilderEnum>,
186    profile: SchemaProfile,
187    row_count: usize,
188}
189
190impl std::fmt::Debug for HanaBatchProcessor {
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        f.debug_struct("HanaBatchProcessor")
193            .field("schema", &self.schema)
194            .field("config", &self.config)
195            .field("builders", &format!("[{} builders]", self.builders.len()))
196            .field("profile", &self.profile)
197            .field("row_count", &self.row_count)
198            .finish()
199    }
200}
201
202impl HanaBatchProcessor {
203    /// Create a new batch processor.
204    ///
205    /// # Arguments
206    ///
207    /// * `schema` - Arrow schema for the batches
208    /// * `config` - Batch processing configuration
209    #[must_use]
210    pub fn new(schema: SchemaRef, config: BatchConfig) -> Self {
211        let factory = BuilderFactory::from_config(&config);
212        let builders = factory.create_builders_enum_for_schema_with_metadata(&schema);
213        let profile = SchemaProfile::analyze(&schema);
214
215        Self {
216            schema,
217            config,
218            builders,
219            profile,
220            row_count: 0,
221        }
222    }
223
224    /// Create with default configuration.
225    #[must_use]
226    pub fn with_defaults(schema: SchemaRef) -> Self {
227        Self::new(schema, BatchConfig::default())
228    }
229
230    /// Process a single row.
231    ///
232    /// Returns `Ok(Some(batch))` when a batch is ready, `Ok(None)` when more
233    /// rows are needed to fill a batch.
234    ///
235    /// # Errors
236    ///
237    /// Returns error if value conversion fails or schema mismatches.
238    pub fn process_row(&mut self, row: &hdbconnect::Row) -> Result<Option<RecordBatch>> {
239        self.process_row_generic(row)
240    }
241
242    /// Process a single row using the generic `RowLike` trait.
243    ///
244    /// This method enables unit testing with `MockRow` instead of requiring
245    /// a HANA connection.
246    ///
247    /// Returns `Ok(Some(batch))` when a batch is ready, `Ok(None)` when more
248    /// rows are needed to fill a batch.
249    ///
250    /// # Errors
251    ///
252    /// Returns error if value conversion fails or schema mismatches.
253    ///
254    /// # Example
255    ///
256    /// ```rust,ignore
257    /// use hdbconnect_arrow::traits::row::{MockRow, MockRowBuilder};
258    ///
259    /// let row = MockRowBuilder::new().int(42).string("test").build();
260    /// let result = processor.process_row_generic(&row)?;
261    /// ```
262    pub fn process_row_generic<R: RowLike>(&mut self, row: &R) -> Result<Option<RecordBatch>> {
263        if row.len() != self.builders.len() {
264            return Err(crate::ArrowConversionError::schema_mismatch(
265                self.builders.len(),
266                row.len(),
267            ));
268        }
269
270        match &self.profile {
271            SchemaProfile::Homogeneous { kind, .. } => {
272                self.process_row_homogeneous(row, *kind)?;
273            }
274            SchemaProfile::Mixed => {
275                self.process_row_mixed(row)?;
276            }
277        }
278
279        self.row_count += 1;
280
281        if self.row_count >= self.config.batch_size.get() {
282            return Ok(Some(self.finish_current_batch()?));
283        }
284
285        Ok(None)
286    }
287
288    /// Process row for homogeneous schemas with specialized dispatch.
289    ///
290    /// Hoists type match outside the column loop, creating monomorphized
291    /// inner loops with zero enum dispatch overhead. Specialized for the
292    /// top 5 most common HANA types (Int64, Decimal128, Utf8, Int32, Float64).
293    ///
294    /// Other types fall back to generic enum dispatch path.
295    fn process_row_homogeneous<R: RowLike>(&mut self, row: &R, kind: BuilderKind) -> Result<()> {
296        match kind {
297            BuilderKind::Int64 => {
298                specialize_homogeneous_loop!(self, row, Int64)
299            }
300            BuilderKind::Decimal128 => {
301                specialize_homogeneous_loop_boxed!(self, row, Decimal128)
302            }
303            BuilderKind::Utf8 => {
304                specialize_homogeneous_loop_boxed!(self, row, Utf8)
305            }
306            BuilderKind::Int32 => {
307                specialize_homogeneous_loop!(self, row, Int32)
308            }
309            BuilderKind::Float64 => {
310                specialize_homogeneous_loop!(self, row, Float64)
311            }
312            _ => self.process_row_mixed(row),
313        }
314    }
315
316    /// Process row for mixed schemas with enum dispatch per column.
317    fn process_row_mixed<R: RowLike>(&mut self, row: &R) -> Result<()> {
318        for (i, builder) in self.builders.iter_mut().enumerate() {
319            let value = row.get(i);
320            match value {
321                hdbconnect::HdbValue::NULL => builder.append_null(),
322                v => builder.append_hana_value(v)?,
323            }
324        }
325        Ok(())
326    }
327
328    /// Flush any remaining rows as a final batch.
329    ///
330    /// # Errors
331    ///
332    /// Returns error if `RecordBatch` creation fails.
333    pub fn flush(&mut self) -> Result<Option<RecordBatch>> {
334        if self.row_count == 0 {
335            return Ok(None);
336        }
337
338        Ok(Some(self.finish_current_batch()?))
339    }
340
341    /// Returns the schema of batches produced by this processor.
342    #[must_use]
343    pub fn schema(&self) -> SchemaRef {
344        Arc::clone(&self.schema)
345    }
346
347    /// Returns the current row count in the buffer.
348    #[must_use]
349    pub const fn buffered_rows(&self) -> usize {
350        self.row_count
351    }
352
353    /// Returns the schema profile for this processor.
354    #[must_use]
355    pub const fn profile(&self) -> &SchemaProfile {
356        &self.profile
357    }
358
359    /// Finish the current batch and reset builders.
360    ///
361    /// Arrow builders reset their internal state after `finish()`, keeping
362    /// allocated capacity for the next batch. This avoids heap allocations
363    /// at batch boundaries.
364    ///
365    /// # Errors
366    ///
367    /// Returns error if `RecordBatch` creation fails.
368    fn finish_current_batch(&mut self) -> Result<RecordBatch> {
369        let arrays: Vec<_> = self.builders.iter_mut().map(BuilderEnum::finish).collect();
370
371        let batch = RecordBatch::try_new(Arc::clone(&self.schema), arrays)
372            .map_err(|e| crate::ArrowConversionError::value_conversion("batch", e.to_string()))?;
373
374        self.row_count = 0;
375
376        Ok(batch)
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use arrow_schema::{DataType, Field, Schema};
383
384    use super::*;
385
386    // ═══════════════════════════════════════════════════════════════════════════
387    // SchemaProfile Tests
388    // ═══════════════════════════════════════════════════════════════════════════
389
390    #[test]
391    fn test_schema_profile_homogeneous_int64() {
392        let schema = Schema::new(vec![
393            Field::new("col1", DataType::Int64, false),
394            Field::new("col2", DataType::Int64, false),
395            Field::new("col3", DataType::Int64, false),
396        ]);
397
398        let profile = SchemaProfile::analyze(&schema);
399        assert!(profile.is_homogeneous());
400        match profile {
401            SchemaProfile::Homogeneous { column_count, kind } => {
402                assert_eq!(column_count, 3);
403                assert_eq!(kind, BuilderKind::Int64);
404            }
405            SchemaProfile::Mixed => panic!("Expected homogeneous profile"),
406        }
407    }
408
409    #[test]
410    fn test_schema_profile_homogeneous_utf8() {
411        let schema = Schema::new(vec![
412            Field::new("col1", DataType::Utf8, true),
413            Field::new("col2", DataType::Utf8, true),
414        ]);
415
416        let profile = SchemaProfile::analyze(&schema);
417        assert!(profile.is_homogeneous());
418        match profile {
419            SchemaProfile::Homogeneous { column_count, kind } => {
420                assert_eq!(column_count, 2);
421                assert_eq!(kind, BuilderKind::Utf8);
422            }
423            SchemaProfile::Mixed => panic!("Expected homogeneous profile"),
424        }
425    }
426
427    #[test]
428    fn test_schema_profile_mixed() {
429        let schema = Schema::new(vec![
430            Field::new("id", DataType::Int64, false),
431            Field::new("name", DataType::Utf8, true),
432            Field::new("active", DataType::Boolean, false),
433        ]);
434
435        let profile = SchemaProfile::analyze(&schema);
436        assert!(!profile.is_homogeneous());
437        assert!(matches!(profile, SchemaProfile::Mixed));
438    }
439
440    #[test]
441    fn test_schema_profile_single_column() {
442        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
443
444        let profile = SchemaProfile::analyze(&schema);
445        assert!(profile.is_homogeneous());
446        match profile {
447            SchemaProfile::Homogeneous { column_count, kind } => {
448                assert_eq!(column_count, 1);
449                assert_eq!(kind, BuilderKind::Int32);
450            }
451            SchemaProfile::Mixed => panic!("Expected homogeneous profile"),
452        }
453    }
454
455    #[test]
456    fn test_schema_profile_empty() {
457        let fields: Vec<Field> = vec![];
458        let schema = Schema::new(fields);
459
460        let profile = SchemaProfile::analyze(&schema);
461        assert!(!profile.is_homogeneous());
462        assert!(matches!(profile, SchemaProfile::Mixed));
463    }
464
465    #[test]
466    fn test_schema_profile_decimal_same_precision_scale() {
467        let schema = Schema::new(vec![
468            Field::new("price1", DataType::Decimal128(18, 2), false),
469            Field::new("price2", DataType::Decimal128(18, 2), false),
470        ]);
471
472        let profile = SchemaProfile::analyze(&schema);
473        assert!(profile.is_homogeneous());
474    }
475
476    #[test]
477    fn test_schema_profile_decimal_different_precision() {
478        let schema = Schema::new(vec![
479            Field::new("price1", DataType::Decimal128(18, 2), false),
480            Field::new("price2", DataType::Decimal128(10, 4), false),
481        ]);
482
483        let profile = SchemaProfile::analyze(&schema);
484        assert!(profile.is_homogeneous());
485    }
486
487    // ═══════════════════════════════════════════════════════════════════════════
488    // Processor Creation Tests
489    // ═══════════════════════════════════════════════════════════════════════════
490
491    #[test]
492    fn test_processor_creation() {
493        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
494        let config = BatchConfig::with_batch_size(100);
495
496        let processor = HanaBatchProcessor::new(schema, config);
497        assert_eq!(processor.buffered_rows(), 0);
498    }
499
500    #[test]
501    fn test_processor_with_defaults() {
502        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
503        let processor = HanaBatchProcessor::with_defaults(schema);
504        assert_eq!(processor.buffered_rows(), 0);
505    }
506
507    #[test]
508    fn test_processor_schema() {
509        let schema = Arc::new(Schema::new(vec![
510            Field::new("id", DataType::Int32, false),
511            Field::new("name", DataType::Utf8, true),
512        ]));
513        let processor = HanaBatchProcessor::with_defaults(Arc::clone(&schema));
514
515        let returned_schema = processor.schema();
516        assert_eq!(returned_schema.fields().len(), 2);
517        assert_eq!(returned_schema.field(0).name(), "id");
518        assert_eq!(returned_schema.field(1).name(), "name");
519    }
520
521    #[test]
522    fn test_processor_initial_buffered_rows() {
523        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
524        let processor = HanaBatchProcessor::with_defaults(schema);
525        assert_eq!(processor.buffered_rows(), 0);
526    }
527
528    #[test]
529    fn test_processor_profile_homogeneous() {
530        let schema = Arc::new(Schema::new(vec![
531            Field::new("col1", DataType::Int64, false),
532            Field::new("col2", DataType::Int64, false),
533        ]));
534        let processor = HanaBatchProcessor::with_defaults(schema);
535        assert!(processor.profile().is_homogeneous());
536    }
537
538    #[test]
539    fn test_processor_profile_mixed() {
540        let schema = Arc::new(Schema::new(vec![
541            Field::new("id", DataType::Int64, false),
542            Field::new("name", DataType::Utf8, true),
543        ]));
544        let processor = HanaBatchProcessor::with_defaults(schema);
545        assert!(!processor.profile().is_homogeneous());
546    }
547
548    // ═══════════════════════════════════════════════════════════════════════════
549    // Processor with Different Configs
550    // ═══════════════════════════════════════════════════════════════════════════
551
552    #[test]
553    fn test_processor_with_small_batch_size() {
554        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
555        let config = BatchConfig::with_batch_size(10);
556        let processor = HanaBatchProcessor::new(schema, config);
557        assert_eq!(processor.buffered_rows(), 0);
558    }
559
560    #[test]
561    fn test_processor_with_large_batch_size() {
562        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
563        let config = BatchConfig::with_batch_size(100000);
564        let processor = HanaBatchProcessor::new(schema, config);
565        assert_eq!(processor.buffered_rows(), 0);
566    }
567
568    #[test]
569    fn test_processor_with_custom_config() {
570        let schema = Arc::new(Schema::new(vec![Field::new("data", DataType::Utf8, true)]));
571        let config = BatchConfig::with_batch_size(500)
572            .string_capacity(10000)
573            .binary_capacity(5000);
574        let processor = HanaBatchProcessor::new(schema, config);
575        assert_eq!(processor.buffered_rows(), 0);
576    }
577
578    // ═══════════════════════════════════════════════════════════════════════════
579    // Processor with Different Schema Types
580    // ═══════════════════════════════════════════════════════════════════════════
581
582    #[test]
583    fn test_processor_with_empty_schema() {
584        let fields: Vec<Field> = vec![];
585        let schema = Arc::new(Schema::new(fields));
586        let processor = HanaBatchProcessor::with_defaults(schema);
587        assert_eq!(processor.buffered_rows(), 0);
588    }
589
590    #[test]
591    fn test_processor_with_single_column_schema() {
592        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
593        let processor = HanaBatchProcessor::with_defaults(schema);
594        assert_eq!(processor.buffered_rows(), 0);
595    }
596
597    #[test]
598    fn test_processor_with_multi_column_schema() {
599        let schema = Arc::new(Schema::new(vec![
600            Field::new("id", DataType::Int64, false),
601            Field::new("name", DataType::Utf8, true),
602            Field::new("price", DataType::Decimal128(18, 2), false),
603            Field::new("is_active", DataType::Boolean, false),
604        ]));
605        let processor = HanaBatchProcessor::with_defaults(schema);
606        assert_eq!(processor.buffered_rows(), 0);
607    }
608
609    #[test]
610    fn test_processor_with_all_numeric_types() {
611        let schema = Arc::new(Schema::new(vec![
612            Field::new("tiny", DataType::UInt8, false),
613            Field::new("small", DataType::Int16, false),
614            Field::new("int", DataType::Int32, false),
615            Field::new("big", DataType::Int64, false),
616            Field::new("real", DataType::Float32, false),
617            Field::new("double", DataType::Float64, false),
618        ]));
619        let processor = HanaBatchProcessor::with_defaults(schema);
620        assert_eq!(processor.buffered_rows(), 0);
621    }
622
623    #[test]
624    fn test_processor_with_string_types() {
625        let schema = Arc::new(Schema::new(vec![
626            Field::new("small_str", DataType::Utf8, true),
627            Field::new("large_str", DataType::LargeUtf8, true),
628        ]));
629        let processor = HanaBatchProcessor::with_defaults(schema);
630        assert_eq!(processor.buffered_rows(), 0);
631    }
632
633    #[test]
634    fn test_processor_with_binary_types() {
635        let schema = Arc::new(Schema::new(vec![
636            Field::new("bin", DataType::Binary, true),
637            Field::new("large_bin", DataType::LargeBinary, true),
638            Field::new("fixed_bin", DataType::FixedSizeBinary(16), true),
639        ]));
640        let processor = HanaBatchProcessor::with_defaults(schema);
641        assert_eq!(processor.buffered_rows(), 0);
642    }
643
644    // ═══════════════════════════════════════════════════════════════════════════
645    // Flush Tests (without rows - tests empty flush)
646    // ═══════════════════════════════════════════════════════════════════════════
647
648    #[test]
649    fn test_processor_flush_empty() {
650        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
651        let mut processor = HanaBatchProcessor::with_defaults(schema);
652
653        let result = processor.flush();
654        assert!(result.is_ok());
655        assert!(result.unwrap().is_none());
656    }
657
658    #[test]
659    fn test_processor_flush_multiple_times_when_empty() {
660        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
661        let mut processor = HanaBatchProcessor::with_defaults(schema);
662
663        assert!(processor.flush().unwrap().is_none());
664        assert!(processor.flush().unwrap().is_none());
665        assert!(processor.flush().unwrap().is_none());
666    }
667
668    // ═══════════════════════════════════════════════════════════════════════════
669    // Debug Tests
670    // ═══════════════════════════════════════════════════════════════════════════
671
672    #[test]
673    fn test_processor_debug() {
674        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
675        let processor = HanaBatchProcessor::with_defaults(schema);
676
677        let debug_str = format!("{:?}", processor);
678        assert!(debug_str.contains("HanaBatchProcessor"));
679        assert!(debug_str.contains("row_count"));
680        assert!(debug_str.contains("builders"));
681        assert!(debug_str.contains("profile"));
682    }
683
684    // ═══════════════════════════════════════════════════════════════════════════
685    // Schema Ref Tests
686    // ═══════════════════════════════════════════════════════════════════════════
687
688    #[test]
689    fn test_processor_schema_returns_same_schema() {
690        let original_schema = Arc::new(Schema::new(vec![
691            Field::new("id", DataType::Int32, false),
692            Field::new("value", DataType::Float64, true),
693        ]));
694        let processor = HanaBatchProcessor::with_defaults(Arc::clone(&original_schema));
695
696        let schema1 = processor.schema();
697        let schema2 = processor.schema();
698
699        assert!(Arc::ptr_eq(&schema1, &schema2));
700    }
701
702    // ═══════════════════════════════════════════════════════════════════════════
703    // MockRow Tests (unit testing without HANA connection)
704    // ═══════════════════════════════════════════════════════════════════════════
705
706    #[test]
707    fn test_process_row_generic_with_mock_row() {
708        use crate::traits::row::MockRowBuilder;
709
710        let schema = Arc::new(Schema::new(vec![
711            Field::new("id", DataType::Int32, false),
712            Field::new("name", DataType::Utf8, true),
713        ]));
714        let config = BatchConfig::with_batch_size(10);
715        let mut processor = HanaBatchProcessor::new(schema, config);
716
717        let row = MockRowBuilder::new().int(42).string("test").build();
718
719        let result = processor.process_row_generic(&row);
720        assert!(result.is_ok());
721        assert!(result.unwrap().is_none()); // Not enough rows for batch
722        assert_eq!(processor.buffered_rows(), 1);
723    }
724
725    #[test]
726    fn test_process_row_generic_batch_ready() {
727        use crate::traits::row::MockRowBuilder;
728
729        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
730        let config = BatchConfig::with_batch_size(3);
731        let mut processor = HanaBatchProcessor::new(schema, config);
732
733        // Add rows until batch is ready
734        for i in 0..3 {
735            let row = MockRowBuilder::new().int(i).build();
736            let result = processor.process_row_generic(&row).unwrap();
737            if i < 2 {
738                assert!(result.is_none());
739            } else {
740                // Third row should trigger batch
741                let batch = result.expect("batch should be ready");
742                assert_eq!(batch.num_rows(), 3);
743            }
744        }
745    }
746
747    #[test]
748    fn test_process_row_generic_with_nulls() {
749        use crate::traits::row::MockRowBuilder;
750
751        let schema = Arc::new(Schema::new(vec![
752            Field::new("id", DataType::Int32, true),
753            Field::new("name", DataType::Utf8, true),
754        ]));
755        let config = BatchConfig::with_batch_size(2);
756        let mut processor = HanaBatchProcessor::new(schema, config);
757
758        // Row with null values
759        let row = MockRowBuilder::new().null().null().build();
760
761        let result = processor.process_row_generic(&row);
762        assert!(result.is_ok());
763        assert_eq!(processor.buffered_rows(), 1);
764    }
765
766    #[test]
767    fn test_process_row_generic_schema_mismatch() {
768        use crate::traits::row::MockRowBuilder;
769
770        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
771        let mut processor = HanaBatchProcessor::with_defaults(schema);
772
773        // Row with wrong number of columns
774        let row = MockRowBuilder::new().int(1).string("extra").build();
775
776        let result = processor.process_row_generic(&row);
777        assert!(result.is_err());
778        let err = result.unwrap_err();
779        assert!(err.is_schema_mismatch());
780    }
781
782    #[test]
783    fn test_process_row_generic_flush() {
784        use crate::traits::row::MockRowBuilder;
785
786        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
787        let config = BatchConfig::with_batch_size(100);
788        let mut processor = HanaBatchProcessor::new(schema, config);
789
790        // Add some rows (less than batch size)
791        for i in 0..5 {
792            let row = MockRowBuilder::new().int(i).build();
793            processor.process_row_generic(&row).unwrap();
794        }
795
796        assert_eq!(processor.buffered_rows(), 5);
797
798        // Flush remaining rows
799        let batch = processor
800            .flush()
801            .unwrap()
802            .expect("should have remaining rows");
803        assert_eq!(batch.num_rows(), 5);
804        assert_eq!(processor.buffered_rows(), 0);
805    }
806
807    // ═══════════════════════════════════════════════════════════════════════════
808    // Builder Reuse Tests
809    // ═══════════════════════════════════════════════════════════════════════════
810
811    #[test]
812    fn test_builder_reuse_after_finish() {
813        use crate::traits::row::MockRowBuilder;
814
815        let schema = Arc::new(Schema::new(vec![
816            Field::new("id", DataType::Int32, false),
817            Field::new("name", DataType::Utf8, true),
818        ]));
819        let config = BatchConfig::with_batch_size(2);
820        let mut processor = HanaBatchProcessor::new(schema, config);
821
822        // Process first batch
823        for i in 0..2 {
824            let row = MockRowBuilder::new().int(i).string("test").build();
825            let result = processor.process_row_generic(&row).unwrap();
826            if i == 1 {
827                assert!(result.is_some(), "First batch should be ready");
828            }
829        }
830
831        // Verify processor can continue processing (builders reused)
832        for i in 2..4 {
833            let row = MockRowBuilder::new().int(i).string("test2").build();
834            let result = processor.process_row_generic(&row).unwrap();
835            if i == 3 {
836                let batch = result.expect("Second batch should be ready");
837                assert_eq!(batch.num_rows(), 2);
838                // Verify data is from second batch, not first
839                let id_array = batch
840                    .column(0)
841                    .as_any()
842                    .downcast_ref::<arrow_array::Int32Array>()
843                    .unwrap();
844                assert_eq!(id_array.value(0), 2);
845                assert_eq!(id_array.value(1), 3);
846            }
847        }
848    }
849
850    // ═══════════════════════════════════════════════════════════════════════════
851    // Homogeneous Schema Processing Tests
852    // ═══════════════════════════════════════════════════════════════════════════
853
854    #[test]
855    fn test_processor_homogeneous_int64() {
856        use crate::traits::row::MockRowBuilder;
857
858        let schema = Arc::new(Schema::new(vec![
859            Field::new("col1", DataType::Int64, false),
860            Field::new("col2", DataType::Int64, false),
861            Field::new("col3", DataType::Int64, false),
862        ]));
863        let config = BatchConfig::with_batch_size(3);
864        let mut processor = HanaBatchProcessor::new(schema, config);
865
866        assert!(processor.profile().is_homogeneous());
867
868        // Process rows
869        processor
870            .process_row_generic(&MockRowBuilder::new().bigint(1).bigint(2).bigint(3).build())
871            .unwrap();
872        processor
873            .process_row_generic(&MockRowBuilder::new().bigint(4).bigint(5).bigint(6).build())
874            .unwrap();
875        let result = processor
876            .process_row_generic(&MockRowBuilder::new().bigint(7).bigint(8).bigint(9).build())
877            .unwrap();
878
879        let batch = result.expect("batch should be ready");
880        assert_eq!(batch.num_rows(), 3);
881        assert_eq!(batch.num_columns(), 3);
882    }
883
884    #[test]
885    fn test_processor_homogeneous_int32() {
886        use crate::traits::row::MockRowBuilder;
887
888        let schema = Arc::new(Schema::new(vec![
889            Field::new("col1", DataType::Int32, false),
890            Field::new("col2", DataType::Int32, false),
891            Field::new("col3", DataType::Int32, false),
892        ]));
893        let config = BatchConfig::with_batch_size(2);
894        let mut processor = HanaBatchProcessor::new(schema, config);
895
896        assert!(processor.profile().is_homogeneous());
897
898        processor
899            .process_row_generic(&MockRowBuilder::new().int(10).int(20).int(30).build())
900            .unwrap();
901        let result = processor
902            .process_row_generic(&MockRowBuilder::new().int(40).int(50).int(60).build())
903            .unwrap();
904
905        let batch = result.expect("batch should be ready");
906        assert_eq!(batch.num_rows(), 2);
907        assert_eq!(batch.num_columns(), 3);
908    }
909
910    #[test]
911    fn test_processor_homogeneous_float64() {
912        use crate::traits::row::MockRowBuilder;
913
914        let schema = Arc::new(Schema::new(vec![
915            Field::new("col1", DataType::Float64, false),
916            Field::new("col2", DataType::Float64, false),
917        ]));
918        let config = BatchConfig::with_batch_size(2);
919        let mut processor = HanaBatchProcessor::new(schema, config);
920
921        assert!(processor.profile().is_homogeneous());
922
923        processor
924            .process_row_generic(&MockRowBuilder::new().double(1.5).double(2.5).build())
925            .unwrap();
926        let result = processor
927            .process_row_generic(&MockRowBuilder::new().double(3.5).double(4.5).build())
928            .unwrap();
929
930        let batch = result.expect("batch should be ready");
931        assert_eq!(batch.num_rows(), 2);
932        assert_eq!(batch.num_columns(), 2);
933    }
934
935    #[test]
936    fn test_processor_homogeneous_decimal128() {
937        use crate::traits::row::MockRowBuilder;
938
939        let schema = Arc::new(Schema::new(vec![
940            Field::new("col1", DataType::Decimal128(18, 2), false),
941            Field::new("col2", DataType::Decimal128(18, 2), false),
942        ]));
943        let config = BatchConfig::with_batch_size(2);
944        let mut processor = HanaBatchProcessor::new(schema, config);
945
946        assert!(processor.profile().is_homogeneous());
947
948        processor
949            .process_row_generic(
950                &MockRowBuilder::new()
951                    .decimal_str("100.50")
952                    .decimal_str("200.75")
953                    .build(),
954            )
955            .unwrap();
956        let result = processor
957            .process_row_generic(
958                &MockRowBuilder::new()
959                    .decimal_str("300.25")
960                    .decimal_str("400.99")
961                    .build(),
962            )
963            .unwrap();
964
965        let batch = result.expect("batch should be ready");
966        assert_eq!(batch.num_rows(), 2);
967        assert_eq!(batch.num_columns(), 2);
968    }
969
970    #[test]
971    fn test_processor_homogeneous_utf8() {
972        use crate::traits::row::MockRowBuilder;
973
974        let schema = Arc::new(Schema::new(vec![
975            Field::new("col1", DataType::Utf8, true),
976            Field::new("col2", DataType::Utf8, true),
977            Field::new("col3", DataType::Utf8, true),
978        ]));
979        let config = BatchConfig::with_batch_size(2);
980        let mut processor = HanaBatchProcessor::new(schema, config);
981
982        assert!(processor.profile().is_homogeneous());
983
984        processor
985            .process_row_generic(
986                &MockRowBuilder::new()
987                    .string("alice")
988                    .string("bob")
989                    .string("charlie")
990                    .build(),
991            )
992            .unwrap();
993        let result = processor
994            .process_row_generic(
995                &MockRowBuilder::new()
996                    .string("diana")
997                    .string("eve")
998                    .string("frank")
999                    .build(),
1000            )
1001            .unwrap();
1002
1003        let batch = result.expect("batch should be ready");
1004        assert_eq!(batch.num_rows(), 2);
1005        assert_eq!(batch.num_columns(), 3);
1006    }
1007
1008    #[test]
1009    fn test_processor_mixed_schema() {
1010        use crate::traits::row::MockRowBuilder;
1011
1012        let schema = Arc::new(Schema::new(vec![
1013            Field::new("id", DataType::Int64, false),
1014            Field::new("name", DataType::Utf8, true),
1015            Field::new("active", DataType::Boolean, false),
1016        ]));
1017        let config = BatchConfig::with_batch_size(2);
1018        let mut processor = HanaBatchProcessor::new(schema, config);
1019
1020        assert!(!processor.profile().is_homogeneous());
1021
1022        // Process rows
1023        processor
1024            .process_row_generic(
1025                &MockRowBuilder::new()
1026                    .bigint(1)
1027                    .string("Alice")
1028                    .boolean(true)
1029                    .build(),
1030            )
1031            .unwrap();
1032        let result = processor
1033            .process_row_generic(
1034                &MockRowBuilder::new()
1035                    .bigint(2)
1036                    .string("Bob")
1037                    .boolean(false)
1038                    .build(),
1039            )
1040            .unwrap();
1041
1042        let batch = result.expect("batch should be ready");
1043        assert_eq!(batch.num_rows(), 2);
1044        assert_eq!(batch.num_columns(), 3);
1045    }
1046
1047    #[test]
1048    fn test_processor_homogeneous_with_nulls() {
1049        use crate::traits::row::MockRowBuilder;
1050
1051        let schema = Arc::new(Schema::new(vec![
1052            Field::new("col1", DataType::Int64, true),
1053            Field::new("col2", DataType::Int64, true),
1054        ]));
1055        let config = BatchConfig::with_batch_size(2);
1056        let mut processor = HanaBatchProcessor::new(schema, config);
1057
1058        assert!(processor.profile().is_homogeneous());
1059
1060        processor
1061            .process_row_generic(&MockRowBuilder::new().bigint(1).null().build())
1062            .unwrap();
1063        let result = processor
1064            .process_row_generic(&MockRowBuilder::new().null().bigint(2).build())
1065            .unwrap();
1066
1067        let batch = result.expect("batch should be ready");
1068        assert_eq!(batch.num_rows(), 2);
1069    }
1070
1071    #[test]
1072    fn test_processor_homogeneous_int32_with_nulls() {
1073        use crate::traits::row::MockRowBuilder;
1074
1075        let schema = Arc::new(Schema::new(vec![
1076            Field::new("col1", DataType::Int32, true),
1077            Field::new("col2", DataType::Int32, true),
1078            Field::new("col3", DataType::Int32, true),
1079        ]));
1080        let config = BatchConfig::with_batch_size(3);
1081        let mut processor = HanaBatchProcessor::new(schema, config);
1082
1083        assert!(processor.profile().is_homogeneous());
1084
1085        processor
1086            .process_row_generic(&MockRowBuilder::new().int(1).null().int(3).build())
1087            .unwrap();
1088        processor
1089            .process_row_generic(&MockRowBuilder::new().null().int(5).null().build())
1090            .unwrap();
1091        let result = processor
1092            .process_row_generic(&MockRowBuilder::new().int(7).int(8).null().build())
1093            .unwrap();
1094
1095        let batch = result.expect("batch should be ready");
1096        assert_eq!(batch.num_rows(), 3);
1097        assert_eq!(batch.num_columns(), 3);
1098    }
1099
1100    #[test]
1101    fn test_processor_homogeneous_float64_with_nulls() {
1102        use crate::traits::row::MockRowBuilder;
1103
1104        let schema = Arc::new(Schema::new(vec![
1105            Field::new("col1", DataType::Float64, true),
1106            Field::new("col2", DataType::Float64, true),
1107        ]));
1108        let config = BatchConfig::with_batch_size(2);
1109        let mut processor = HanaBatchProcessor::new(schema, config);
1110
1111        assert!(processor.profile().is_homogeneous());
1112
1113        processor
1114            .process_row_generic(&MockRowBuilder::new().double(1.5).null().build())
1115            .unwrap();
1116        let result = processor
1117            .process_row_generic(&MockRowBuilder::new().null().double(3.5).build())
1118            .unwrap();
1119
1120        let batch = result.expect("batch should be ready");
1121        assert_eq!(batch.num_rows(), 2);
1122    }
1123
1124    #[test]
1125    fn test_processor_homogeneous_decimal128_with_nulls() {
1126        use crate::traits::row::MockRowBuilder;
1127
1128        let schema = Arc::new(Schema::new(vec![
1129            Field::new("col1", DataType::Decimal128(18, 2), true),
1130            Field::new("col2", DataType::Decimal128(18, 2), true),
1131        ]));
1132        let config = BatchConfig::with_batch_size(2);
1133        let mut processor = HanaBatchProcessor::new(schema, config);
1134
1135        assert!(processor.profile().is_homogeneous());
1136
1137        processor
1138            .process_row_generic(&MockRowBuilder::new().decimal_str("100.50").null().build())
1139            .unwrap();
1140        let result = processor
1141            .process_row_generic(&MockRowBuilder::new().null().decimal_str("400.99").build())
1142            .unwrap();
1143
1144        let batch = result.expect("batch should be ready");
1145        assert_eq!(batch.num_rows(), 2);
1146    }
1147
1148    #[test]
1149    fn test_processor_homogeneous_utf8_with_nulls() {
1150        use crate::traits::row::MockRowBuilder;
1151
1152        let schema = Arc::new(Schema::new(vec![
1153            Field::new("col1", DataType::Utf8, true),
1154            Field::new("col2", DataType::Utf8, true),
1155        ]));
1156        let config = BatchConfig::with_batch_size(2);
1157        let mut processor = HanaBatchProcessor::new(schema, config);
1158
1159        assert!(processor.profile().is_homogeneous());
1160
1161        processor
1162            .process_row_generic(&MockRowBuilder::new().string("hello").null().build())
1163            .unwrap();
1164        let result = processor
1165            .process_row_generic(&MockRowBuilder::new().null().string("world").build())
1166            .unwrap();
1167
1168        let batch = result.expect("batch should be ready");
1169        assert_eq!(batch.num_rows(), 2);
1170    }
1171
1172    #[test]
1173    fn test_processor_homogeneous_wide_schema() {
1174        use crate::traits::row::MockRowBuilder;
1175
1176        let mut fields = vec![];
1177        for i in 0..100 {
1178            fields.push(Field::new(&format!("col{}", i), DataType::Int64, false));
1179        }
1180
1181        let schema = Arc::new(Schema::new(fields));
1182        let config = BatchConfig::with_batch_size(1);
1183        let mut processor = HanaBatchProcessor::new(schema, config);
1184
1185        assert!(processor.profile().is_homogeneous());
1186
1187        let mut row_builder = MockRowBuilder::new();
1188        for i in 0..100 {
1189            row_builder = row_builder.bigint(i as i64);
1190        }
1191
1192        let result = processor.process_row_generic(&row_builder.build()).unwrap();
1193
1194        let batch = result.expect("batch should be ready");
1195        assert_eq!(batch.num_rows(), 1);
1196        assert_eq!(batch.num_columns(), 100);
1197    }
1198
1199    #[test]
1200    fn test_processor_homogeneous_unsupported_type_fallback() {
1201        let schema = Arc::new(Schema::new(vec![
1202            Field::new("col1", DataType::Boolean, false),
1203            Field::new("col2", DataType::Boolean, false),
1204        ]));
1205        let config = BatchConfig::with_batch_size(2);
1206        let processor = HanaBatchProcessor::new(schema, config);
1207
1208        assert!(processor.profile().is_homogeneous());
1209    }
1210
1211    #[test]
1212    fn test_processor_multiple_batches_homogeneous() {
1213        use crate::traits::row::MockRowBuilder;
1214
1215        let schema = Arc::new(Schema::new(vec![
1216            Field::new("col1", DataType::Int64, false),
1217            Field::new("col2", DataType::Int64, false),
1218        ]));
1219        let config = BatchConfig::with_batch_size(2);
1220        let mut processor = HanaBatchProcessor::new(schema, config);
1221
1222        assert!(processor.profile().is_homogeneous());
1223
1224        // First batch
1225        processor
1226            .process_row_generic(&MockRowBuilder::new().bigint(1).bigint(2).build())
1227            .unwrap();
1228        let batch1 = processor
1229            .process_row_generic(&MockRowBuilder::new().bigint(3).bigint(4).build())
1230            .unwrap();
1231        assert!(batch1.is_some());
1232
1233        // Second batch
1234        processor
1235            .process_row_generic(&MockRowBuilder::new().bigint(5).bigint(6).build())
1236            .unwrap();
1237        let batch2 = processor
1238            .process_row_generic(&MockRowBuilder::new().bigint(7).bigint(8).build())
1239            .unwrap();
1240        assert!(batch2.is_some());
1241
1242        // Verify both batches
1243        assert_eq!(processor.buffered_rows(), 0);
1244    }
1245
1246    #[test]
1247    fn test_processor_homogeneous_int32_wide() {
1248        use crate::traits::row::MockRowBuilder;
1249
1250        let mut fields = vec![];
1251        for i in 0..50 {
1252            fields.push(Field::new(&format!("col{}", i), DataType::Int32, false));
1253        }
1254
1255        let schema = Arc::new(Schema::new(fields));
1256        let config = BatchConfig::with_batch_size(1);
1257        let mut processor = HanaBatchProcessor::new(schema, config);
1258
1259        assert!(processor.profile().is_homogeneous());
1260
1261        let mut row_builder = MockRowBuilder::new();
1262        for i in 0..50 {
1263            row_builder = row_builder.int(i as i32);
1264        }
1265
1266        let result = processor.process_row_generic(&row_builder.build()).unwrap();
1267
1268        let batch = result.expect("batch should be ready");
1269        assert_eq!(batch.num_rows(), 1);
1270        assert_eq!(batch.num_columns(), 50);
1271    }
1272
1273    #[test]
1274    fn test_processor_homogeneous_float64_wide() {
1275        use crate::traits::row::MockRowBuilder;
1276
1277        let mut fields = vec![];
1278        for i in 0..30 {
1279            fields.push(Field::new(&format!("col{}", i), DataType::Float64, false));
1280        }
1281
1282        let schema = Arc::new(Schema::new(fields));
1283        let config = BatchConfig::with_batch_size(1);
1284        let mut processor = HanaBatchProcessor::new(schema, config);
1285
1286        assert!(processor.profile().is_homogeneous());
1287
1288        let mut row_builder = MockRowBuilder::new();
1289        for i in 0..30 {
1290            row_builder = row_builder.double(i as f64 * 1.5);
1291        }
1292
1293        let result = processor.process_row_generic(&row_builder.build()).unwrap();
1294
1295        let batch = result.expect("batch should be ready");
1296        assert_eq!(batch.num_rows(), 1);
1297        assert_eq!(batch.num_columns(), 30);
1298    }
1299
1300    #[test]
1301    fn test_processor_homogeneous_utf8_wide() {
1302        use crate::traits::row::MockRowBuilder;
1303
1304        let mut fields = vec![];
1305        for i in 0..20 {
1306            fields.push(Field::new(&format!("col{}", i), DataType::Utf8, true));
1307        }
1308
1309        let schema = Arc::new(Schema::new(fields));
1310        let config = BatchConfig::with_batch_size(1);
1311        let mut processor = HanaBatchProcessor::new(schema, config);
1312
1313        assert!(processor.profile().is_homogeneous());
1314
1315        let mut row_builder = MockRowBuilder::new();
1316        for i in 0..20 {
1317            row_builder = row_builder.string(&format!("value{}", i));
1318        }
1319
1320        let result = processor.process_row_generic(&row_builder.build()).unwrap();
1321
1322        let batch = result.expect("batch should be ready");
1323        assert_eq!(batch.num_rows(), 1);
1324        assert_eq!(batch.num_columns(), 20);
1325    }
1326}