1use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit};
9
10use crate::Result;
11use crate::builders::dispatch::{BuilderEnum, BuilderKind};
12use crate::builders::factory::BuilderFactory;
13use crate::traits::builder::HanaCompatibleBuilder;
14use crate::traits::row::RowLike;
15use crate::traits::streaming::BatchConfig;
16
17#[inline]
19fn append_value_to_builder<B: HanaCompatibleBuilder>(
20 builder: &mut B,
21 value: &hdbconnect::HdbValue,
22) -> Result<()> {
23 match value {
24 hdbconnect::HdbValue::NULL => {
25 builder.append_null();
26 Ok(())
27 }
28 v => builder.append_hana_value(v),
29 }
30}
31
32macro_rules! specialize_homogeneous_loop {
36 ($self:expr, $row:expr, $variant:ident) => {{
37 for (i, builder) in $self.builders.iter_mut().enumerate() {
38 let BuilderEnum::$variant(concrete_builder) = builder else {
39 unreachable!("SchemaProfile guarantees homogeneous type")
40 };
41 append_value_to_builder(concrete_builder, &$row.get(i))?;
42 }
43 Ok(())
44 }};
45}
46
47macro_rules! specialize_homogeneous_loop_boxed {
49 ($self:expr, $row:expr, $variant:ident) => {{
50 for (i, builder) in $self.builders.iter_mut().enumerate() {
51 let BuilderEnum::$variant(boxed) = builder else {
52 unreachable!("SchemaProfile guarantees homogeneous type")
53 };
54 append_value_to_builder(boxed.as_mut(), &$row.get(i))?;
55 }
56 Ok(())
57 }};
58}
59
60#[derive(Debug, Clone)]
65pub enum SchemaProfile {
66 Homogeneous {
68 column_count: usize,
70 kind: BuilderKind,
72 },
73 Mixed,
75}
76
77impl SchemaProfile {
78 #[must_use]
80 pub fn analyze(schema: &Schema) -> Self {
81 let fields = schema.fields();
82 if fields.is_empty() {
83 return Self::Mixed;
84 }
85
86 let first_type = fields[0].data_type();
87 let all_same = fields
88 .iter()
89 .skip(1)
90 .all(|f| Self::types_equivalent(first_type, f.data_type()));
91
92 debug_assert!(
94 !all_same
95 || fields.iter().all(|f| {
96 let kind1 = Self::data_type_to_kind(first_type);
97 let kind2 = Self::data_type_to_kind(f.data_type());
98 kind1 == kind2
99 }),
100 "Discriminant-equivalent types must map to same BuilderKind"
101 );
102
103 if all_same {
104 Self::Homogeneous {
105 column_count: fields.len(),
106 kind: Self::data_type_to_kind(first_type),
107 }
108 } else {
109 Self::Mixed
110 }
111 }
112
113 fn types_equivalent(a: &DataType, b: &DataType) -> bool {
123 std::mem::discriminant(a) == std::mem::discriminant(b)
124 }
125
126 #[allow(clippy::match_same_arms, clippy::missing_const_for_fn)]
127 fn data_type_to_kind(dt: &DataType) -> BuilderKind {
128 match dt {
129 DataType::UInt8 => BuilderKind::UInt8,
130 DataType::Int16 => BuilderKind::Int16,
131 DataType::Int32 => BuilderKind::Int32,
132 DataType::Int64 => BuilderKind::Int64,
133 DataType::Float32 => BuilderKind::Float32,
134 DataType::Float64 => BuilderKind::Float64,
135 DataType::Decimal128(_, _) => BuilderKind::Decimal128,
136 DataType::Boolean => BuilderKind::Boolean,
137 DataType::Utf8 => BuilderKind::Utf8,
138 DataType::LargeUtf8 => BuilderKind::LargeUtf8,
139 DataType::Binary => BuilderKind::Binary,
140 DataType::LargeBinary => BuilderKind::LargeBinary,
141 DataType::FixedSizeBinary(_) => BuilderKind::FixedSizeBinary,
142 DataType::Date32 => BuilderKind::Date32,
143 DataType::Time64(TimeUnit::Nanosecond) => BuilderKind::Time64Nanosecond,
144 DataType::Timestamp(TimeUnit::Nanosecond, None) => BuilderKind::TimestampNanosecond,
145 _ => BuilderKind::Utf8,
146 }
147 }
148
149 #[must_use]
151 pub const fn is_homogeneous(&self) -> bool {
152 matches!(self, Self::Homogeneous { .. })
153 }
154}
155
156pub struct HanaBatchProcessor {
183 schema: SchemaRef,
184 config: BatchConfig,
185 builders: Vec<BuilderEnum>,
186 profile: SchemaProfile,
187 row_count: usize,
188}
189
190impl std::fmt::Debug for HanaBatchProcessor {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 f.debug_struct("HanaBatchProcessor")
193 .field("schema", &self.schema)
194 .field("config", &self.config)
195 .field("builders", &format!("[{} builders]", self.builders.len()))
196 .field("profile", &self.profile)
197 .field("row_count", &self.row_count)
198 .finish()
199 }
200}
201
202impl HanaBatchProcessor {
203 #[must_use]
210 pub fn new(schema: SchemaRef, config: BatchConfig) -> Self {
211 let factory = BuilderFactory::from_config(&config);
212 let builders = factory.create_builders_enum_for_schema_with_metadata(&schema);
213 let profile = SchemaProfile::analyze(&schema);
214
215 Self {
216 schema,
217 config,
218 builders,
219 profile,
220 row_count: 0,
221 }
222 }
223
224 #[must_use]
226 pub fn with_defaults(schema: SchemaRef) -> Self {
227 Self::new(schema, BatchConfig::default())
228 }
229
230 pub fn process_row(&mut self, row: &hdbconnect::Row) -> Result<Option<RecordBatch>> {
239 self.process_row_generic(row)
240 }
241
242 pub fn process_row_generic<R: RowLike>(&mut self, row: &R) -> Result<Option<RecordBatch>> {
263 if row.len() != self.builders.len() {
264 return Err(crate::ArrowConversionError::schema_mismatch(
265 self.builders.len(),
266 row.len(),
267 ));
268 }
269
270 match &self.profile {
271 SchemaProfile::Homogeneous { kind, .. } => {
272 self.process_row_homogeneous(row, *kind)?;
273 }
274 SchemaProfile::Mixed => {
275 self.process_row_mixed(row)?;
276 }
277 }
278
279 self.row_count += 1;
280
281 if self.row_count >= self.config.batch_size.get() {
282 return Ok(Some(self.finish_current_batch()?));
283 }
284
285 Ok(None)
286 }
287
288 fn process_row_homogeneous<R: RowLike>(&mut self, row: &R, kind: BuilderKind) -> Result<()> {
296 match kind {
297 BuilderKind::Int64 => {
298 specialize_homogeneous_loop!(self, row, Int64)
299 }
300 BuilderKind::Decimal128 => {
301 specialize_homogeneous_loop_boxed!(self, row, Decimal128)
302 }
303 BuilderKind::Utf8 => {
304 specialize_homogeneous_loop_boxed!(self, row, Utf8)
305 }
306 BuilderKind::Int32 => {
307 specialize_homogeneous_loop!(self, row, Int32)
308 }
309 BuilderKind::Float64 => {
310 specialize_homogeneous_loop!(self, row, Float64)
311 }
312 _ => self.process_row_mixed(row),
313 }
314 }
315
316 fn process_row_mixed<R: RowLike>(&mut self, row: &R) -> Result<()> {
318 for (i, builder) in self.builders.iter_mut().enumerate() {
319 let value = row.get(i);
320 match value {
321 hdbconnect::HdbValue::NULL => builder.append_null(),
322 v => builder.append_hana_value(v)?,
323 }
324 }
325 Ok(())
326 }
327
328 pub fn flush(&mut self) -> Result<Option<RecordBatch>> {
334 if self.row_count == 0 {
335 return Ok(None);
336 }
337
338 Ok(Some(self.finish_current_batch()?))
339 }
340
341 #[must_use]
343 pub fn schema(&self) -> SchemaRef {
344 Arc::clone(&self.schema)
345 }
346
347 #[must_use]
349 pub const fn buffered_rows(&self) -> usize {
350 self.row_count
351 }
352
353 #[must_use]
355 pub const fn profile(&self) -> &SchemaProfile {
356 &self.profile
357 }
358
359 fn finish_current_batch(&mut self) -> Result<RecordBatch> {
369 let arrays: Vec<_> = self.builders.iter_mut().map(BuilderEnum::finish).collect();
370
371 let batch = RecordBatch::try_new(Arc::clone(&self.schema), arrays)
372 .map_err(|e| crate::ArrowConversionError::value_conversion("batch", e.to_string()))?;
373
374 self.row_count = 0;
375
376 Ok(batch)
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use arrow_schema::{DataType, Field, Schema};
383
384 use super::*;
385
386 #[test]
391 fn test_schema_profile_homogeneous_int64() {
392 let schema = Schema::new(vec![
393 Field::new("col1", DataType::Int64, false),
394 Field::new("col2", DataType::Int64, false),
395 Field::new("col3", DataType::Int64, false),
396 ]);
397
398 let profile = SchemaProfile::analyze(&schema);
399 assert!(profile.is_homogeneous());
400 match profile {
401 SchemaProfile::Homogeneous { column_count, kind } => {
402 assert_eq!(column_count, 3);
403 assert_eq!(kind, BuilderKind::Int64);
404 }
405 SchemaProfile::Mixed => panic!("Expected homogeneous profile"),
406 }
407 }
408
409 #[test]
410 fn test_schema_profile_homogeneous_utf8() {
411 let schema = Schema::new(vec![
412 Field::new("col1", DataType::Utf8, true),
413 Field::new("col2", DataType::Utf8, true),
414 ]);
415
416 let profile = SchemaProfile::analyze(&schema);
417 assert!(profile.is_homogeneous());
418 match profile {
419 SchemaProfile::Homogeneous { column_count, kind } => {
420 assert_eq!(column_count, 2);
421 assert_eq!(kind, BuilderKind::Utf8);
422 }
423 SchemaProfile::Mixed => panic!("Expected homogeneous profile"),
424 }
425 }
426
427 #[test]
428 fn test_schema_profile_mixed() {
429 let schema = Schema::new(vec![
430 Field::new("id", DataType::Int64, false),
431 Field::new("name", DataType::Utf8, true),
432 Field::new("active", DataType::Boolean, false),
433 ]);
434
435 let profile = SchemaProfile::analyze(&schema);
436 assert!(!profile.is_homogeneous());
437 assert!(matches!(profile, SchemaProfile::Mixed));
438 }
439
440 #[test]
441 fn test_schema_profile_single_column() {
442 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
443
444 let profile = SchemaProfile::analyze(&schema);
445 assert!(profile.is_homogeneous());
446 match profile {
447 SchemaProfile::Homogeneous { column_count, kind } => {
448 assert_eq!(column_count, 1);
449 assert_eq!(kind, BuilderKind::Int32);
450 }
451 SchemaProfile::Mixed => panic!("Expected homogeneous profile"),
452 }
453 }
454
455 #[test]
456 fn test_schema_profile_empty() {
457 let fields: Vec<Field> = vec![];
458 let schema = Schema::new(fields);
459
460 let profile = SchemaProfile::analyze(&schema);
461 assert!(!profile.is_homogeneous());
462 assert!(matches!(profile, SchemaProfile::Mixed));
463 }
464
465 #[test]
466 fn test_schema_profile_decimal_same_precision_scale() {
467 let schema = Schema::new(vec![
468 Field::new("price1", DataType::Decimal128(18, 2), false),
469 Field::new("price2", DataType::Decimal128(18, 2), false),
470 ]);
471
472 let profile = SchemaProfile::analyze(&schema);
473 assert!(profile.is_homogeneous());
474 }
475
476 #[test]
477 fn test_schema_profile_decimal_different_precision() {
478 let schema = Schema::new(vec![
479 Field::new("price1", DataType::Decimal128(18, 2), false),
480 Field::new("price2", DataType::Decimal128(10, 4), false),
481 ]);
482
483 let profile = SchemaProfile::analyze(&schema);
484 assert!(profile.is_homogeneous());
485 }
486
487 #[test]
492 fn test_processor_creation() {
493 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
494 let config = BatchConfig::with_batch_size(100);
495
496 let processor = HanaBatchProcessor::new(schema, config);
497 assert_eq!(processor.buffered_rows(), 0);
498 }
499
500 #[test]
501 fn test_processor_with_defaults() {
502 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
503 let processor = HanaBatchProcessor::with_defaults(schema);
504 assert_eq!(processor.buffered_rows(), 0);
505 }
506
507 #[test]
508 fn test_processor_schema() {
509 let schema = Arc::new(Schema::new(vec![
510 Field::new("id", DataType::Int32, false),
511 Field::new("name", DataType::Utf8, true),
512 ]));
513 let processor = HanaBatchProcessor::with_defaults(Arc::clone(&schema));
514
515 let returned_schema = processor.schema();
516 assert_eq!(returned_schema.fields().len(), 2);
517 assert_eq!(returned_schema.field(0).name(), "id");
518 assert_eq!(returned_schema.field(1).name(), "name");
519 }
520
521 #[test]
522 fn test_processor_initial_buffered_rows() {
523 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
524 let processor = HanaBatchProcessor::with_defaults(schema);
525 assert_eq!(processor.buffered_rows(), 0);
526 }
527
528 #[test]
529 fn test_processor_profile_homogeneous() {
530 let schema = Arc::new(Schema::new(vec![
531 Field::new("col1", DataType::Int64, false),
532 Field::new("col2", DataType::Int64, false),
533 ]));
534 let processor = HanaBatchProcessor::with_defaults(schema);
535 assert!(processor.profile().is_homogeneous());
536 }
537
538 #[test]
539 fn test_processor_profile_mixed() {
540 let schema = Arc::new(Schema::new(vec![
541 Field::new("id", DataType::Int64, false),
542 Field::new("name", DataType::Utf8, true),
543 ]));
544 let processor = HanaBatchProcessor::with_defaults(schema);
545 assert!(!processor.profile().is_homogeneous());
546 }
547
548 #[test]
553 fn test_processor_with_small_batch_size() {
554 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
555 let config = BatchConfig::with_batch_size(10);
556 let processor = HanaBatchProcessor::new(schema, config);
557 assert_eq!(processor.buffered_rows(), 0);
558 }
559
560 #[test]
561 fn test_processor_with_large_batch_size() {
562 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
563 let config = BatchConfig::with_batch_size(100000);
564 let processor = HanaBatchProcessor::new(schema, config);
565 assert_eq!(processor.buffered_rows(), 0);
566 }
567
568 #[test]
569 fn test_processor_with_custom_config() {
570 let schema = Arc::new(Schema::new(vec![Field::new("data", DataType::Utf8, true)]));
571 let config = BatchConfig::with_batch_size(500)
572 .string_capacity(10000)
573 .binary_capacity(5000);
574 let processor = HanaBatchProcessor::new(schema, config);
575 assert_eq!(processor.buffered_rows(), 0);
576 }
577
578 #[test]
583 fn test_processor_with_empty_schema() {
584 let fields: Vec<Field> = vec![];
585 let schema = Arc::new(Schema::new(fields));
586 let processor = HanaBatchProcessor::with_defaults(schema);
587 assert_eq!(processor.buffered_rows(), 0);
588 }
589
590 #[test]
591 fn test_processor_with_single_column_schema() {
592 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
593 let processor = HanaBatchProcessor::with_defaults(schema);
594 assert_eq!(processor.buffered_rows(), 0);
595 }
596
597 #[test]
598 fn test_processor_with_multi_column_schema() {
599 let schema = Arc::new(Schema::new(vec![
600 Field::new("id", DataType::Int64, false),
601 Field::new("name", DataType::Utf8, true),
602 Field::new("price", DataType::Decimal128(18, 2), false),
603 Field::new("is_active", DataType::Boolean, false),
604 ]));
605 let processor = HanaBatchProcessor::with_defaults(schema);
606 assert_eq!(processor.buffered_rows(), 0);
607 }
608
609 #[test]
610 fn test_processor_with_all_numeric_types() {
611 let schema = Arc::new(Schema::new(vec![
612 Field::new("tiny", DataType::UInt8, false),
613 Field::new("small", DataType::Int16, false),
614 Field::new("int", DataType::Int32, false),
615 Field::new("big", DataType::Int64, false),
616 Field::new("real", DataType::Float32, false),
617 Field::new("double", DataType::Float64, false),
618 ]));
619 let processor = HanaBatchProcessor::with_defaults(schema);
620 assert_eq!(processor.buffered_rows(), 0);
621 }
622
623 #[test]
624 fn test_processor_with_string_types() {
625 let schema = Arc::new(Schema::new(vec![
626 Field::new("small_str", DataType::Utf8, true),
627 Field::new("large_str", DataType::LargeUtf8, true),
628 ]));
629 let processor = HanaBatchProcessor::with_defaults(schema);
630 assert_eq!(processor.buffered_rows(), 0);
631 }
632
633 #[test]
634 fn test_processor_with_binary_types() {
635 let schema = Arc::new(Schema::new(vec![
636 Field::new("bin", DataType::Binary, true),
637 Field::new("large_bin", DataType::LargeBinary, true),
638 Field::new("fixed_bin", DataType::FixedSizeBinary(16), true),
639 ]));
640 let processor = HanaBatchProcessor::with_defaults(schema);
641 assert_eq!(processor.buffered_rows(), 0);
642 }
643
644 #[test]
649 fn test_processor_flush_empty() {
650 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
651 let mut processor = HanaBatchProcessor::with_defaults(schema);
652
653 let result = processor.flush();
654 assert!(result.is_ok());
655 assert!(result.unwrap().is_none());
656 }
657
658 #[test]
659 fn test_processor_flush_multiple_times_when_empty() {
660 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
661 let mut processor = HanaBatchProcessor::with_defaults(schema);
662
663 assert!(processor.flush().unwrap().is_none());
664 assert!(processor.flush().unwrap().is_none());
665 assert!(processor.flush().unwrap().is_none());
666 }
667
668 #[test]
673 fn test_processor_debug() {
674 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
675 let processor = HanaBatchProcessor::with_defaults(schema);
676
677 let debug_str = format!("{:?}", processor);
678 assert!(debug_str.contains("HanaBatchProcessor"));
679 assert!(debug_str.contains("row_count"));
680 assert!(debug_str.contains("builders"));
681 assert!(debug_str.contains("profile"));
682 }
683
684 #[test]
689 fn test_processor_schema_returns_same_schema() {
690 let original_schema = Arc::new(Schema::new(vec![
691 Field::new("id", DataType::Int32, false),
692 Field::new("value", DataType::Float64, true),
693 ]));
694 let processor = HanaBatchProcessor::with_defaults(Arc::clone(&original_schema));
695
696 let schema1 = processor.schema();
697 let schema2 = processor.schema();
698
699 assert!(Arc::ptr_eq(&schema1, &schema2));
700 }
701
702 #[test]
707 fn test_process_row_generic_with_mock_row() {
708 use crate::traits::row::MockRowBuilder;
709
710 let schema = Arc::new(Schema::new(vec![
711 Field::new("id", DataType::Int32, false),
712 Field::new("name", DataType::Utf8, true),
713 ]));
714 let config = BatchConfig::with_batch_size(10);
715 let mut processor = HanaBatchProcessor::new(schema, config);
716
717 let row = MockRowBuilder::new().int(42).string("test").build();
718
719 let result = processor.process_row_generic(&row);
720 assert!(result.is_ok());
721 assert!(result.unwrap().is_none()); assert_eq!(processor.buffered_rows(), 1);
723 }
724
725 #[test]
726 fn test_process_row_generic_batch_ready() {
727 use crate::traits::row::MockRowBuilder;
728
729 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
730 let config = BatchConfig::with_batch_size(3);
731 let mut processor = HanaBatchProcessor::new(schema, config);
732
733 for i in 0..3 {
735 let row = MockRowBuilder::new().int(i).build();
736 let result = processor.process_row_generic(&row).unwrap();
737 if i < 2 {
738 assert!(result.is_none());
739 } else {
740 let batch = result.expect("batch should be ready");
742 assert_eq!(batch.num_rows(), 3);
743 }
744 }
745 }
746
747 #[test]
748 fn test_process_row_generic_with_nulls() {
749 use crate::traits::row::MockRowBuilder;
750
751 let schema = Arc::new(Schema::new(vec![
752 Field::new("id", DataType::Int32, true),
753 Field::new("name", DataType::Utf8, true),
754 ]));
755 let config = BatchConfig::with_batch_size(2);
756 let mut processor = HanaBatchProcessor::new(schema, config);
757
758 let row = MockRowBuilder::new().null().null().build();
760
761 let result = processor.process_row_generic(&row);
762 assert!(result.is_ok());
763 assert_eq!(processor.buffered_rows(), 1);
764 }
765
766 #[test]
767 fn test_process_row_generic_schema_mismatch() {
768 use crate::traits::row::MockRowBuilder;
769
770 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
771 let mut processor = HanaBatchProcessor::with_defaults(schema);
772
773 let row = MockRowBuilder::new().int(1).string("extra").build();
775
776 let result = processor.process_row_generic(&row);
777 assert!(result.is_err());
778 let err = result.unwrap_err();
779 assert!(err.is_schema_mismatch());
780 }
781
782 #[test]
783 fn test_process_row_generic_flush() {
784 use crate::traits::row::MockRowBuilder;
785
786 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
787 let config = BatchConfig::with_batch_size(100);
788 let mut processor = HanaBatchProcessor::new(schema, config);
789
790 for i in 0..5 {
792 let row = MockRowBuilder::new().int(i).build();
793 processor.process_row_generic(&row).unwrap();
794 }
795
796 assert_eq!(processor.buffered_rows(), 5);
797
798 let batch = processor
800 .flush()
801 .unwrap()
802 .expect("should have remaining rows");
803 assert_eq!(batch.num_rows(), 5);
804 assert_eq!(processor.buffered_rows(), 0);
805 }
806
807 #[test]
812 fn test_builder_reuse_after_finish() {
813 use crate::traits::row::MockRowBuilder;
814
815 let schema = Arc::new(Schema::new(vec![
816 Field::new("id", DataType::Int32, false),
817 Field::new("name", DataType::Utf8, true),
818 ]));
819 let config = BatchConfig::with_batch_size(2);
820 let mut processor = HanaBatchProcessor::new(schema, config);
821
822 for i in 0..2 {
824 let row = MockRowBuilder::new().int(i).string("test").build();
825 let result = processor.process_row_generic(&row).unwrap();
826 if i == 1 {
827 assert!(result.is_some(), "First batch should be ready");
828 }
829 }
830
831 for i in 2..4 {
833 let row = MockRowBuilder::new().int(i).string("test2").build();
834 let result = processor.process_row_generic(&row).unwrap();
835 if i == 3 {
836 let batch = result.expect("Second batch should be ready");
837 assert_eq!(batch.num_rows(), 2);
838 let id_array = batch
840 .column(0)
841 .as_any()
842 .downcast_ref::<arrow_array::Int32Array>()
843 .unwrap();
844 assert_eq!(id_array.value(0), 2);
845 assert_eq!(id_array.value(1), 3);
846 }
847 }
848 }
849
850 #[test]
855 fn test_processor_homogeneous_int64() {
856 use crate::traits::row::MockRowBuilder;
857
858 let schema = Arc::new(Schema::new(vec![
859 Field::new("col1", DataType::Int64, false),
860 Field::new("col2", DataType::Int64, false),
861 Field::new("col3", DataType::Int64, false),
862 ]));
863 let config = BatchConfig::with_batch_size(3);
864 let mut processor = HanaBatchProcessor::new(schema, config);
865
866 assert!(processor.profile().is_homogeneous());
867
868 processor
870 .process_row_generic(&MockRowBuilder::new().bigint(1).bigint(2).bigint(3).build())
871 .unwrap();
872 processor
873 .process_row_generic(&MockRowBuilder::new().bigint(4).bigint(5).bigint(6).build())
874 .unwrap();
875 let result = processor
876 .process_row_generic(&MockRowBuilder::new().bigint(7).bigint(8).bigint(9).build())
877 .unwrap();
878
879 let batch = result.expect("batch should be ready");
880 assert_eq!(batch.num_rows(), 3);
881 assert_eq!(batch.num_columns(), 3);
882 }
883
884 #[test]
885 fn test_processor_homogeneous_int32() {
886 use crate::traits::row::MockRowBuilder;
887
888 let schema = Arc::new(Schema::new(vec![
889 Field::new("col1", DataType::Int32, false),
890 Field::new("col2", DataType::Int32, false),
891 Field::new("col3", DataType::Int32, false),
892 ]));
893 let config = BatchConfig::with_batch_size(2);
894 let mut processor = HanaBatchProcessor::new(schema, config);
895
896 assert!(processor.profile().is_homogeneous());
897
898 processor
899 .process_row_generic(&MockRowBuilder::new().int(10).int(20).int(30).build())
900 .unwrap();
901 let result = processor
902 .process_row_generic(&MockRowBuilder::new().int(40).int(50).int(60).build())
903 .unwrap();
904
905 let batch = result.expect("batch should be ready");
906 assert_eq!(batch.num_rows(), 2);
907 assert_eq!(batch.num_columns(), 3);
908 }
909
910 #[test]
911 fn test_processor_homogeneous_float64() {
912 use crate::traits::row::MockRowBuilder;
913
914 let schema = Arc::new(Schema::new(vec![
915 Field::new("col1", DataType::Float64, false),
916 Field::new("col2", DataType::Float64, false),
917 ]));
918 let config = BatchConfig::with_batch_size(2);
919 let mut processor = HanaBatchProcessor::new(schema, config);
920
921 assert!(processor.profile().is_homogeneous());
922
923 processor
924 .process_row_generic(&MockRowBuilder::new().double(1.5).double(2.5).build())
925 .unwrap();
926 let result = processor
927 .process_row_generic(&MockRowBuilder::new().double(3.5).double(4.5).build())
928 .unwrap();
929
930 let batch = result.expect("batch should be ready");
931 assert_eq!(batch.num_rows(), 2);
932 assert_eq!(batch.num_columns(), 2);
933 }
934
935 #[test]
936 #[cfg(feature = "test-utils")]
937 fn test_processor_homogeneous_decimal128() {
938 use crate::traits::row::MockRowBuilder;
939
940 let schema = Arc::new(Schema::new(vec![
941 Field::new("col1", DataType::Decimal128(18, 2), false),
942 Field::new("col2", DataType::Decimal128(18, 2), false),
943 ]));
944 let config = BatchConfig::with_batch_size(2);
945 let mut processor = HanaBatchProcessor::new(schema, config);
946
947 assert!(processor.profile().is_homogeneous());
948
949 processor
950 .process_row_generic(
951 &MockRowBuilder::new()
952 .decimal_str("100.50")
953 .decimal_str("200.75")
954 .build(),
955 )
956 .unwrap();
957 let result = processor
958 .process_row_generic(
959 &MockRowBuilder::new()
960 .decimal_str("300.25")
961 .decimal_str("400.99")
962 .build(),
963 )
964 .unwrap();
965
966 let batch = result.expect("batch should be ready");
967 assert_eq!(batch.num_rows(), 2);
968 assert_eq!(batch.num_columns(), 2);
969 }
970
971 #[test]
972 fn test_processor_homogeneous_utf8() {
973 use crate::traits::row::MockRowBuilder;
974
975 let schema = Arc::new(Schema::new(vec![
976 Field::new("col1", DataType::Utf8, true),
977 Field::new("col2", DataType::Utf8, true),
978 Field::new("col3", DataType::Utf8, true),
979 ]));
980 let config = BatchConfig::with_batch_size(2);
981 let mut processor = HanaBatchProcessor::new(schema, config);
982
983 assert!(processor.profile().is_homogeneous());
984
985 processor
986 .process_row_generic(
987 &MockRowBuilder::new()
988 .string("alice")
989 .string("bob")
990 .string("charlie")
991 .build(),
992 )
993 .unwrap();
994 let result = processor
995 .process_row_generic(
996 &MockRowBuilder::new()
997 .string("diana")
998 .string("eve")
999 .string("frank")
1000 .build(),
1001 )
1002 .unwrap();
1003
1004 let batch = result.expect("batch should be ready");
1005 assert_eq!(batch.num_rows(), 2);
1006 assert_eq!(batch.num_columns(), 3);
1007 }
1008
1009 #[test]
1010 fn test_processor_mixed_schema() {
1011 use crate::traits::row::MockRowBuilder;
1012
1013 let schema = Arc::new(Schema::new(vec![
1014 Field::new("id", DataType::Int64, false),
1015 Field::new("name", DataType::Utf8, true),
1016 Field::new("active", DataType::Boolean, false),
1017 ]));
1018 let config = BatchConfig::with_batch_size(2);
1019 let mut processor = HanaBatchProcessor::new(schema, config);
1020
1021 assert!(!processor.profile().is_homogeneous());
1022
1023 processor
1025 .process_row_generic(
1026 &MockRowBuilder::new()
1027 .bigint(1)
1028 .string("Alice")
1029 .boolean(true)
1030 .build(),
1031 )
1032 .unwrap();
1033 let result = processor
1034 .process_row_generic(
1035 &MockRowBuilder::new()
1036 .bigint(2)
1037 .string("Bob")
1038 .boolean(false)
1039 .build(),
1040 )
1041 .unwrap();
1042
1043 let batch = result.expect("batch should be ready");
1044 assert_eq!(batch.num_rows(), 2);
1045 assert_eq!(batch.num_columns(), 3);
1046 }
1047
1048 #[test]
1049 fn test_processor_homogeneous_with_nulls() {
1050 use crate::traits::row::MockRowBuilder;
1051
1052 let schema = Arc::new(Schema::new(vec![
1053 Field::new("col1", DataType::Int64, true),
1054 Field::new("col2", DataType::Int64, true),
1055 ]));
1056 let config = BatchConfig::with_batch_size(2);
1057 let mut processor = HanaBatchProcessor::new(schema, config);
1058
1059 assert!(processor.profile().is_homogeneous());
1060
1061 processor
1062 .process_row_generic(&MockRowBuilder::new().bigint(1).null().build())
1063 .unwrap();
1064 let result = processor
1065 .process_row_generic(&MockRowBuilder::new().null().bigint(2).build())
1066 .unwrap();
1067
1068 let batch = result.expect("batch should be ready");
1069 assert_eq!(batch.num_rows(), 2);
1070 }
1071
1072 #[test]
1073 fn test_processor_homogeneous_int32_with_nulls() {
1074 use crate::traits::row::MockRowBuilder;
1075
1076 let schema = Arc::new(Schema::new(vec![
1077 Field::new("col1", DataType::Int32, true),
1078 Field::new("col2", DataType::Int32, true),
1079 Field::new("col3", DataType::Int32, true),
1080 ]));
1081 let config = BatchConfig::with_batch_size(3);
1082 let mut processor = HanaBatchProcessor::new(schema, config);
1083
1084 assert!(processor.profile().is_homogeneous());
1085
1086 processor
1087 .process_row_generic(&MockRowBuilder::new().int(1).null().int(3).build())
1088 .unwrap();
1089 processor
1090 .process_row_generic(&MockRowBuilder::new().null().int(5).null().build())
1091 .unwrap();
1092 let result = processor
1093 .process_row_generic(&MockRowBuilder::new().int(7).int(8).null().build())
1094 .unwrap();
1095
1096 let batch = result.expect("batch should be ready");
1097 assert_eq!(batch.num_rows(), 3);
1098 assert_eq!(batch.num_columns(), 3);
1099 }
1100
1101 #[test]
1102 fn test_processor_homogeneous_float64_with_nulls() {
1103 use crate::traits::row::MockRowBuilder;
1104
1105 let schema = Arc::new(Schema::new(vec![
1106 Field::new("col1", DataType::Float64, true),
1107 Field::new("col2", DataType::Float64, true),
1108 ]));
1109 let config = BatchConfig::with_batch_size(2);
1110 let mut processor = HanaBatchProcessor::new(schema, config);
1111
1112 assert!(processor.profile().is_homogeneous());
1113
1114 processor
1115 .process_row_generic(&MockRowBuilder::new().double(1.5).null().build())
1116 .unwrap();
1117 let result = processor
1118 .process_row_generic(&MockRowBuilder::new().null().double(3.5).build())
1119 .unwrap();
1120
1121 let batch = result.expect("batch should be ready");
1122 assert_eq!(batch.num_rows(), 2);
1123 }
1124
1125 #[test]
1126 #[cfg(feature = "test-utils")]
1127 fn test_processor_homogeneous_decimal128_with_nulls() {
1128 use crate::traits::row::MockRowBuilder;
1129
1130 let schema = Arc::new(Schema::new(vec![
1131 Field::new("col1", DataType::Decimal128(18, 2), true),
1132 Field::new("col2", DataType::Decimal128(18, 2), true),
1133 ]));
1134 let config = BatchConfig::with_batch_size(2);
1135 let mut processor = HanaBatchProcessor::new(schema, config);
1136
1137 assert!(processor.profile().is_homogeneous());
1138
1139 processor
1140 .process_row_generic(&MockRowBuilder::new().decimal_str("100.50").null().build())
1141 .unwrap();
1142 let result = processor
1143 .process_row_generic(&MockRowBuilder::new().null().decimal_str("400.99").build())
1144 .unwrap();
1145
1146 let batch = result.expect("batch should be ready");
1147 assert_eq!(batch.num_rows(), 2);
1148 }
1149
1150 #[test]
1151 fn test_processor_homogeneous_utf8_with_nulls() {
1152 use crate::traits::row::MockRowBuilder;
1153
1154 let schema = Arc::new(Schema::new(vec![
1155 Field::new("col1", DataType::Utf8, true),
1156 Field::new("col2", DataType::Utf8, true),
1157 ]));
1158 let config = BatchConfig::with_batch_size(2);
1159 let mut processor = HanaBatchProcessor::new(schema, config);
1160
1161 assert!(processor.profile().is_homogeneous());
1162
1163 processor
1164 .process_row_generic(&MockRowBuilder::new().string("hello").null().build())
1165 .unwrap();
1166 let result = processor
1167 .process_row_generic(&MockRowBuilder::new().null().string("world").build())
1168 .unwrap();
1169
1170 let batch = result.expect("batch should be ready");
1171 assert_eq!(batch.num_rows(), 2);
1172 }
1173
1174 #[test]
1175 fn test_processor_homogeneous_wide_schema() {
1176 use crate::traits::row::MockRowBuilder;
1177
1178 let mut fields = vec![];
1179 for i in 0..100 {
1180 fields.push(Field::new(&format!("col{}", i), DataType::Int64, false));
1181 }
1182
1183 let schema = Arc::new(Schema::new(fields));
1184 let config = BatchConfig::with_batch_size(1);
1185 let mut processor = HanaBatchProcessor::new(schema, config);
1186
1187 assert!(processor.profile().is_homogeneous());
1188
1189 let mut row_builder = MockRowBuilder::new();
1190 for i in 0..100 {
1191 row_builder = row_builder.bigint(i as i64);
1192 }
1193
1194 let result = processor.process_row_generic(&row_builder.build()).unwrap();
1195
1196 let batch = result.expect("batch should be ready");
1197 assert_eq!(batch.num_rows(), 1);
1198 assert_eq!(batch.num_columns(), 100);
1199 }
1200
1201 #[test]
1202 fn test_processor_homogeneous_unsupported_type_fallback() {
1203 let schema = Arc::new(Schema::new(vec![
1204 Field::new("col1", DataType::Boolean, false),
1205 Field::new("col2", DataType::Boolean, false),
1206 ]));
1207 let config = BatchConfig::with_batch_size(2);
1208 let processor = HanaBatchProcessor::new(schema, config);
1209
1210 assert!(processor.profile().is_homogeneous());
1211 }
1212
1213 #[test]
1214 fn test_processor_multiple_batches_homogeneous() {
1215 use crate::traits::row::MockRowBuilder;
1216
1217 let schema = Arc::new(Schema::new(vec![
1218 Field::new("col1", DataType::Int64, false),
1219 Field::new("col2", DataType::Int64, false),
1220 ]));
1221 let config = BatchConfig::with_batch_size(2);
1222 let mut processor = HanaBatchProcessor::new(schema, config);
1223
1224 assert!(processor.profile().is_homogeneous());
1225
1226 processor
1228 .process_row_generic(&MockRowBuilder::new().bigint(1).bigint(2).build())
1229 .unwrap();
1230 let batch1 = processor
1231 .process_row_generic(&MockRowBuilder::new().bigint(3).bigint(4).build())
1232 .unwrap();
1233 assert!(batch1.is_some());
1234
1235 processor
1237 .process_row_generic(&MockRowBuilder::new().bigint(5).bigint(6).build())
1238 .unwrap();
1239 let batch2 = processor
1240 .process_row_generic(&MockRowBuilder::new().bigint(7).bigint(8).build())
1241 .unwrap();
1242 assert!(batch2.is_some());
1243
1244 assert_eq!(processor.buffered_rows(), 0);
1246 }
1247
1248 #[test]
1249 fn test_processor_homogeneous_int32_wide() {
1250 use crate::traits::row::MockRowBuilder;
1251
1252 let mut fields = vec![];
1253 for i in 0..50 {
1254 fields.push(Field::new(&format!("col{}", i), DataType::Int32, false));
1255 }
1256
1257 let schema = Arc::new(Schema::new(fields));
1258 let config = BatchConfig::with_batch_size(1);
1259 let mut processor = HanaBatchProcessor::new(schema, config);
1260
1261 assert!(processor.profile().is_homogeneous());
1262
1263 let mut row_builder = MockRowBuilder::new();
1264 for i in 0..50 {
1265 row_builder = row_builder.int(i as i32);
1266 }
1267
1268 let result = processor.process_row_generic(&row_builder.build()).unwrap();
1269
1270 let batch = result.expect("batch should be ready");
1271 assert_eq!(batch.num_rows(), 1);
1272 assert_eq!(batch.num_columns(), 50);
1273 }
1274
1275 #[test]
1276 fn test_processor_homogeneous_float64_wide() {
1277 use crate::traits::row::MockRowBuilder;
1278
1279 let mut fields = vec![];
1280 for i in 0..30 {
1281 fields.push(Field::new(&format!("col{}", i), DataType::Float64, false));
1282 }
1283
1284 let schema = Arc::new(Schema::new(fields));
1285 let config = BatchConfig::with_batch_size(1);
1286 let mut processor = HanaBatchProcessor::new(schema, config);
1287
1288 assert!(processor.profile().is_homogeneous());
1289
1290 let mut row_builder = MockRowBuilder::new();
1291 for i in 0..30 {
1292 row_builder = row_builder.double(i as f64 * 1.5);
1293 }
1294
1295 let result = processor.process_row_generic(&row_builder.build()).unwrap();
1296
1297 let batch = result.expect("batch should be ready");
1298 assert_eq!(batch.num_rows(), 1);
1299 assert_eq!(batch.num_columns(), 30);
1300 }
1301
1302 #[test]
1303 fn test_processor_homogeneous_utf8_wide() {
1304 use crate::traits::row::MockRowBuilder;
1305
1306 let mut fields = vec![];
1307 for i in 0..20 {
1308 fields.push(Field::new(&format!("col{}", i), DataType::Utf8, true));
1309 }
1310
1311 let schema = Arc::new(Schema::new(fields));
1312 let config = BatchConfig::with_batch_size(1);
1313 let mut processor = HanaBatchProcessor::new(schema, config);
1314
1315 assert!(processor.profile().is_homogeneous());
1316
1317 let mut row_builder = MockRowBuilder::new();
1318 for i in 0..20 {
1319 row_builder = row_builder.string(&format!("value{}", i));
1320 }
1321
1322 let result = processor.process_row_generic(&row_builder.build()).unwrap();
1323
1324 let batch = result.expect("batch should be ready");
1325 assert_eq!(batch.num_rows(), 1);
1326 assert_eq!(batch.num_columns(), 20);
1327 }
1328}