1use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit};
9
10use crate::Result;
11use crate::builders::dispatch::{BuilderEnum, BuilderKind};
12use crate::builders::factory::BuilderFactory;
13use crate::traits::builder::HanaCompatibleBuilder;
14use crate::traits::row::RowLike;
15use crate::traits::streaming::BatchConfig;
16
17#[inline]
19fn append_value_to_builder<B: HanaCompatibleBuilder>(
20 builder: &mut B,
21 value: &hdbconnect::HdbValue,
22) -> Result<()> {
23 match value {
24 hdbconnect::HdbValue::NULL => {
25 builder.append_null();
26 Ok(())
27 }
28 v => builder.append_hana_value(v),
29 }
30}
31
32macro_rules! specialize_homogeneous_loop {
36 ($self:expr, $row:expr, $variant:ident) => {{
37 for (i, builder) in $self.builders.iter_mut().enumerate() {
38 let BuilderEnum::$variant(concrete_builder) = builder else {
39 unreachable!("SchemaProfile guarantees homogeneous type")
40 };
41 append_value_to_builder(concrete_builder, &$row.get(i))?;
42 }
43 Ok(())
44 }};
45}
46
47macro_rules! specialize_homogeneous_loop_boxed {
49 ($self:expr, $row:expr, $variant:ident) => {{
50 for (i, builder) in $self.builders.iter_mut().enumerate() {
51 let BuilderEnum::$variant(boxed) = builder else {
52 unreachable!("SchemaProfile guarantees homogeneous type")
53 };
54 append_value_to_builder(boxed.as_mut(), &$row.get(i))?;
55 }
56 Ok(())
57 }};
58}
59
60#[derive(Debug, Clone)]
65pub enum SchemaProfile {
66 Homogeneous {
68 column_count: usize,
70 kind: BuilderKind,
72 },
73 Mixed,
75}
76
77impl SchemaProfile {
78 #[must_use]
80 pub fn analyze(schema: &Schema) -> Self {
81 let fields = schema.fields();
82 if fields.is_empty() {
83 return Self::Mixed;
84 }
85
86 let first_type = fields[0].data_type();
87 let all_same = fields
88 .iter()
89 .skip(1)
90 .all(|f| Self::types_equivalent(first_type, f.data_type()));
91
92 debug_assert!(
94 !all_same
95 || fields.iter().all(|f| {
96 let kind1 = Self::data_type_to_kind(first_type);
97 let kind2 = Self::data_type_to_kind(f.data_type());
98 kind1 == kind2
99 }),
100 "Discriminant-equivalent types must map to same BuilderKind"
101 );
102
103 if all_same {
104 Self::Homogeneous {
105 column_count: fields.len(),
106 kind: Self::data_type_to_kind(first_type),
107 }
108 } else {
109 Self::Mixed
110 }
111 }
112
113 fn types_equivalent(a: &DataType, b: &DataType) -> bool {
123 std::mem::discriminant(a) == std::mem::discriminant(b)
124 }
125
126 #[allow(clippy::match_same_arms, clippy::missing_const_for_fn)]
127 fn data_type_to_kind(dt: &DataType) -> BuilderKind {
128 match dt {
129 DataType::UInt8 => BuilderKind::UInt8,
130 DataType::Int16 => BuilderKind::Int16,
131 DataType::Int32 => BuilderKind::Int32,
132 DataType::Int64 => BuilderKind::Int64,
133 DataType::Float32 => BuilderKind::Float32,
134 DataType::Float64 => BuilderKind::Float64,
135 DataType::Decimal128(_, _) => BuilderKind::Decimal128,
136 DataType::Boolean => BuilderKind::Boolean,
137 DataType::Utf8 => BuilderKind::Utf8,
138 DataType::LargeUtf8 => BuilderKind::LargeUtf8,
139 DataType::Binary => BuilderKind::Binary,
140 DataType::LargeBinary => BuilderKind::LargeBinary,
141 DataType::FixedSizeBinary(_) => BuilderKind::FixedSizeBinary,
142 DataType::Date32 => BuilderKind::Date32,
143 DataType::Time64(TimeUnit::Nanosecond) => BuilderKind::Time64Nanosecond,
144 DataType::Timestamp(TimeUnit::Nanosecond, None) => BuilderKind::TimestampNanosecond,
145 _ => BuilderKind::Utf8,
146 }
147 }
148
149 #[must_use]
151 pub const fn is_homogeneous(&self) -> bool {
152 matches!(self, Self::Homogeneous { .. })
153 }
154}
155
156pub struct HanaBatchProcessor {
183 schema: SchemaRef,
184 config: BatchConfig,
185 builders: Vec<BuilderEnum>,
186 profile: SchemaProfile,
187 row_count: usize,
188}
189
190impl std::fmt::Debug for HanaBatchProcessor {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 f.debug_struct("HanaBatchProcessor")
193 .field("schema", &self.schema)
194 .field("config", &self.config)
195 .field("builders", &format!("[{} builders]", self.builders.len()))
196 .field("profile", &self.profile)
197 .field("row_count", &self.row_count)
198 .finish()
199 }
200}
201
202impl HanaBatchProcessor {
203 #[must_use]
210 pub fn new(schema: SchemaRef, config: BatchConfig) -> Self {
211 let factory = BuilderFactory::from_config(&config);
212 let builders = factory.create_builders_enum_for_schema_with_metadata(&schema);
213 let profile = SchemaProfile::analyze(&schema);
214
215 Self {
216 schema,
217 config,
218 builders,
219 profile,
220 row_count: 0,
221 }
222 }
223
224 #[must_use]
226 pub fn with_defaults(schema: SchemaRef) -> Self {
227 Self::new(schema, BatchConfig::default())
228 }
229
230 pub fn process_row(&mut self, row: &hdbconnect::Row) -> Result<Option<RecordBatch>> {
239 self.process_row_generic(row)
240 }
241
242 pub fn process_row_generic<R: RowLike>(&mut self, row: &R) -> Result<Option<RecordBatch>> {
263 if row.len() != self.builders.len() {
264 return Err(crate::ArrowConversionError::schema_mismatch(
265 self.builders.len(),
266 row.len(),
267 ));
268 }
269
270 match &self.profile {
271 SchemaProfile::Homogeneous { kind, .. } => {
272 self.process_row_homogeneous(row, *kind)?;
273 }
274 SchemaProfile::Mixed => {
275 self.process_row_mixed(row)?;
276 }
277 }
278
279 self.row_count += 1;
280
281 if self.row_count >= self.config.batch_size.get() {
282 return Ok(Some(self.finish_current_batch()?));
283 }
284
285 Ok(None)
286 }
287
288 fn process_row_homogeneous<R: RowLike>(&mut self, row: &R, kind: BuilderKind) -> Result<()> {
296 match kind {
297 BuilderKind::Int64 => {
298 specialize_homogeneous_loop!(self, row, Int64)
299 }
300 BuilderKind::Decimal128 => {
301 specialize_homogeneous_loop_boxed!(self, row, Decimal128)
302 }
303 BuilderKind::Utf8 => {
304 specialize_homogeneous_loop_boxed!(self, row, Utf8)
305 }
306 BuilderKind::Int32 => {
307 specialize_homogeneous_loop!(self, row, Int32)
308 }
309 BuilderKind::Float64 => {
310 specialize_homogeneous_loop!(self, row, Float64)
311 }
312 _ => self.process_row_mixed(row),
313 }
314 }
315
316 fn process_row_mixed<R: RowLike>(&mut self, row: &R) -> Result<()> {
318 for (i, builder) in self.builders.iter_mut().enumerate() {
319 let value = row.get(i);
320 match value {
321 hdbconnect::HdbValue::NULL => builder.append_null(),
322 v => builder.append_hana_value(v)?,
323 }
324 }
325 Ok(())
326 }
327
328 pub fn flush(&mut self) -> Result<Option<RecordBatch>> {
334 if self.row_count == 0 {
335 return Ok(None);
336 }
337
338 Ok(Some(self.finish_current_batch()?))
339 }
340
341 #[must_use]
343 pub fn schema(&self) -> SchemaRef {
344 Arc::clone(&self.schema)
345 }
346
347 #[must_use]
349 pub const fn buffered_rows(&self) -> usize {
350 self.row_count
351 }
352
353 #[must_use]
355 pub const fn profile(&self) -> &SchemaProfile {
356 &self.profile
357 }
358
359 fn finish_current_batch(&mut self) -> Result<RecordBatch> {
369 let arrays: Vec<_> = self.builders.iter_mut().map(BuilderEnum::finish).collect();
370
371 let batch = RecordBatch::try_new(Arc::clone(&self.schema), arrays)
372 .map_err(|e| crate::ArrowConversionError::value_conversion("batch", e.to_string()))?;
373
374 self.row_count = 0;
375
376 Ok(batch)
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use arrow_schema::{DataType, Field, Schema};
383
384 use super::*;
385
386 #[test]
391 fn test_schema_profile_homogeneous_int64() {
392 let schema = Schema::new(vec![
393 Field::new("col1", DataType::Int64, false),
394 Field::new("col2", DataType::Int64, false),
395 Field::new("col3", DataType::Int64, false),
396 ]);
397
398 let profile = SchemaProfile::analyze(&schema);
399 assert!(profile.is_homogeneous());
400 match profile {
401 SchemaProfile::Homogeneous { column_count, kind } => {
402 assert_eq!(column_count, 3);
403 assert_eq!(kind, BuilderKind::Int64);
404 }
405 SchemaProfile::Mixed => panic!("Expected homogeneous profile"),
406 }
407 }
408
409 #[test]
410 fn test_schema_profile_homogeneous_utf8() {
411 let schema = Schema::new(vec![
412 Field::new("col1", DataType::Utf8, true),
413 Field::new("col2", DataType::Utf8, true),
414 ]);
415
416 let profile = SchemaProfile::analyze(&schema);
417 assert!(profile.is_homogeneous());
418 match profile {
419 SchemaProfile::Homogeneous { column_count, kind } => {
420 assert_eq!(column_count, 2);
421 assert_eq!(kind, BuilderKind::Utf8);
422 }
423 SchemaProfile::Mixed => panic!("Expected homogeneous profile"),
424 }
425 }
426
427 #[test]
428 fn test_schema_profile_mixed() {
429 let schema = Schema::new(vec![
430 Field::new("id", DataType::Int64, false),
431 Field::new("name", DataType::Utf8, true),
432 Field::new("active", DataType::Boolean, false),
433 ]);
434
435 let profile = SchemaProfile::analyze(&schema);
436 assert!(!profile.is_homogeneous());
437 assert!(matches!(profile, SchemaProfile::Mixed));
438 }
439
440 #[test]
441 fn test_schema_profile_single_column() {
442 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
443
444 let profile = SchemaProfile::analyze(&schema);
445 assert!(profile.is_homogeneous());
446 match profile {
447 SchemaProfile::Homogeneous { column_count, kind } => {
448 assert_eq!(column_count, 1);
449 assert_eq!(kind, BuilderKind::Int32);
450 }
451 SchemaProfile::Mixed => panic!("Expected homogeneous profile"),
452 }
453 }
454
455 #[test]
456 fn test_schema_profile_empty() {
457 let fields: Vec<Field> = vec![];
458 let schema = Schema::new(fields);
459
460 let profile = SchemaProfile::analyze(&schema);
461 assert!(!profile.is_homogeneous());
462 assert!(matches!(profile, SchemaProfile::Mixed));
463 }
464
465 #[test]
466 fn test_schema_profile_decimal_same_precision_scale() {
467 let schema = Schema::new(vec![
468 Field::new("price1", DataType::Decimal128(18, 2), false),
469 Field::new("price2", DataType::Decimal128(18, 2), false),
470 ]);
471
472 let profile = SchemaProfile::analyze(&schema);
473 assert!(profile.is_homogeneous());
474 }
475
476 #[test]
477 fn test_schema_profile_decimal_different_precision() {
478 let schema = Schema::new(vec![
479 Field::new("price1", DataType::Decimal128(18, 2), false),
480 Field::new("price2", DataType::Decimal128(10, 4), false),
481 ]);
482
483 let profile = SchemaProfile::analyze(&schema);
484 assert!(profile.is_homogeneous());
485 }
486
487 #[test]
492 fn test_processor_creation() {
493 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
494 let config = BatchConfig::with_batch_size(100);
495
496 let processor = HanaBatchProcessor::new(schema, config);
497 assert_eq!(processor.buffered_rows(), 0);
498 }
499
500 #[test]
501 fn test_processor_with_defaults() {
502 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
503 let processor = HanaBatchProcessor::with_defaults(schema);
504 assert_eq!(processor.buffered_rows(), 0);
505 }
506
507 #[test]
508 fn test_processor_schema() {
509 let schema = Arc::new(Schema::new(vec![
510 Field::new("id", DataType::Int32, false),
511 Field::new("name", DataType::Utf8, true),
512 ]));
513 let processor = HanaBatchProcessor::with_defaults(Arc::clone(&schema));
514
515 let returned_schema = processor.schema();
516 assert_eq!(returned_schema.fields().len(), 2);
517 assert_eq!(returned_schema.field(0).name(), "id");
518 assert_eq!(returned_schema.field(1).name(), "name");
519 }
520
521 #[test]
522 fn test_processor_initial_buffered_rows() {
523 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
524 let processor = HanaBatchProcessor::with_defaults(schema);
525 assert_eq!(processor.buffered_rows(), 0);
526 }
527
528 #[test]
529 fn test_processor_profile_homogeneous() {
530 let schema = Arc::new(Schema::new(vec![
531 Field::new("col1", DataType::Int64, false),
532 Field::new("col2", DataType::Int64, false),
533 ]));
534 let processor = HanaBatchProcessor::with_defaults(schema);
535 assert!(processor.profile().is_homogeneous());
536 }
537
538 #[test]
539 fn test_processor_profile_mixed() {
540 let schema = Arc::new(Schema::new(vec![
541 Field::new("id", DataType::Int64, false),
542 Field::new("name", DataType::Utf8, true),
543 ]));
544 let processor = HanaBatchProcessor::with_defaults(schema);
545 assert!(!processor.profile().is_homogeneous());
546 }
547
548 #[test]
553 fn test_processor_with_small_batch_size() {
554 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
555 let config = BatchConfig::with_batch_size(10);
556 let processor = HanaBatchProcessor::new(schema, config);
557 assert_eq!(processor.buffered_rows(), 0);
558 }
559
560 #[test]
561 fn test_processor_with_large_batch_size() {
562 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
563 let config = BatchConfig::with_batch_size(100000);
564 let processor = HanaBatchProcessor::new(schema, config);
565 assert_eq!(processor.buffered_rows(), 0);
566 }
567
568 #[test]
569 fn test_processor_with_custom_config() {
570 let schema = Arc::new(Schema::new(vec![Field::new("data", DataType::Utf8, true)]));
571 let config = BatchConfig::with_batch_size(500)
572 .string_capacity(10000)
573 .binary_capacity(5000);
574 let processor = HanaBatchProcessor::new(schema, config);
575 assert_eq!(processor.buffered_rows(), 0);
576 }
577
578 #[test]
583 fn test_processor_with_empty_schema() {
584 let fields: Vec<Field> = vec![];
585 let schema = Arc::new(Schema::new(fields));
586 let processor = HanaBatchProcessor::with_defaults(schema);
587 assert_eq!(processor.buffered_rows(), 0);
588 }
589
590 #[test]
591 fn test_processor_with_single_column_schema() {
592 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
593 let processor = HanaBatchProcessor::with_defaults(schema);
594 assert_eq!(processor.buffered_rows(), 0);
595 }
596
597 #[test]
598 fn test_processor_with_multi_column_schema() {
599 let schema = Arc::new(Schema::new(vec![
600 Field::new("id", DataType::Int64, false),
601 Field::new("name", DataType::Utf8, true),
602 Field::new("price", DataType::Decimal128(18, 2), false),
603 Field::new("is_active", DataType::Boolean, false),
604 ]));
605 let processor = HanaBatchProcessor::with_defaults(schema);
606 assert_eq!(processor.buffered_rows(), 0);
607 }
608
609 #[test]
610 fn test_processor_with_all_numeric_types() {
611 let schema = Arc::new(Schema::new(vec![
612 Field::new("tiny", DataType::UInt8, false),
613 Field::new("small", DataType::Int16, false),
614 Field::new("int", DataType::Int32, false),
615 Field::new("big", DataType::Int64, false),
616 Field::new("real", DataType::Float32, false),
617 Field::new("double", DataType::Float64, false),
618 ]));
619 let processor = HanaBatchProcessor::with_defaults(schema);
620 assert_eq!(processor.buffered_rows(), 0);
621 }
622
623 #[test]
624 fn test_processor_with_string_types() {
625 let schema = Arc::new(Schema::new(vec![
626 Field::new("small_str", DataType::Utf8, true),
627 Field::new("large_str", DataType::LargeUtf8, true),
628 ]));
629 let processor = HanaBatchProcessor::with_defaults(schema);
630 assert_eq!(processor.buffered_rows(), 0);
631 }
632
633 #[test]
634 fn test_processor_with_binary_types() {
635 let schema = Arc::new(Schema::new(vec![
636 Field::new("bin", DataType::Binary, true),
637 Field::new("large_bin", DataType::LargeBinary, true),
638 Field::new("fixed_bin", DataType::FixedSizeBinary(16), true),
639 ]));
640 let processor = HanaBatchProcessor::with_defaults(schema);
641 assert_eq!(processor.buffered_rows(), 0);
642 }
643
644 #[test]
649 fn test_processor_flush_empty() {
650 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
651 let mut processor = HanaBatchProcessor::with_defaults(schema);
652
653 let result = processor.flush();
654 assert!(result.is_ok());
655 assert!(result.unwrap().is_none());
656 }
657
658 #[test]
659 fn test_processor_flush_multiple_times_when_empty() {
660 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
661 let mut processor = HanaBatchProcessor::with_defaults(schema);
662
663 assert!(processor.flush().unwrap().is_none());
664 assert!(processor.flush().unwrap().is_none());
665 assert!(processor.flush().unwrap().is_none());
666 }
667
668 #[test]
673 fn test_processor_debug() {
674 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
675 let processor = HanaBatchProcessor::with_defaults(schema);
676
677 let debug_str = format!("{:?}", processor);
678 assert!(debug_str.contains("HanaBatchProcessor"));
679 assert!(debug_str.contains("row_count"));
680 assert!(debug_str.contains("builders"));
681 assert!(debug_str.contains("profile"));
682 }
683
684 #[test]
689 fn test_processor_schema_returns_same_schema() {
690 let original_schema = Arc::new(Schema::new(vec![
691 Field::new("id", DataType::Int32, false),
692 Field::new("value", DataType::Float64, true),
693 ]));
694 let processor = HanaBatchProcessor::with_defaults(Arc::clone(&original_schema));
695
696 let schema1 = processor.schema();
697 let schema2 = processor.schema();
698
699 assert!(Arc::ptr_eq(&schema1, &schema2));
700 }
701
702 #[test]
707 fn test_process_row_generic_with_mock_row() {
708 use crate::traits::row::MockRowBuilder;
709
710 let schema = Arc::new(Schema::new(vec![
711 Field::new("id", DataType::Int32, false),
712 Field::new("name", DataType::Utf8, true),
713 ]));
714 let config = BatchConfig::with_batch_size(10);
715 let mut processor = HanaBatchProcessor::new(schema, config);
716
717 let row = MockRowBuilder::new().int(42).string("test").build();
718
719 let result = processor.process_row_generic(&row);
720 assert!(result.is_ok());
721 assert!(result.unwrap().is_none()); assert_eq!(processor.buffered_rows(), 1);
723 }
724
725 #[test]
726 fn test_process_row_generic_batch_ready() {
727 use crate::traits::row::MockRowBuilder;
728
729 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
730 let config = BatchConfig::with_batch_size(3);
731 let mut processor = HanaBatchProcessor::new(schema, config);
732
733 for i in 0..3 {
735 let row = MockRowBuilder::new().int(i).build();
736 let result = processor.process_row_generic(&row).unwrap();
737 if i < 2 {
738 assert!(result.is_none());
739 } else {
740 let batch = result.expect("batch should be ready");
742 assert_eq!(batch.num_rows(), 3);
743 }
744 }
745 }
746
747 #[test]
748 fn test_process_row_generic_with_nulls() {
749 use crate::traits::row::MockRowBuilder;
750
751 let schema = Arc::new(Schema::new(vec![
752 Field::new("id", DataType::Int32, true),
753 Field::new("name", DataType::Utf8, true),
754 ]));
755 let config = BatchConfig::with_batch_size(2);
756 let mut processor = HanaBatchProcessor::new(schema, config);
757
758 let row = MockRowBuilder::new().null().null().build();
760
761 let result = processor.process_row_generic(&row);
762 assert!(result.is_ok());
763 assert_eq!(processor.buffered_rows(), 1);
764 }
765
766 #[test]
767 fn test_process_row_generic_schema_mismatch() {
768 use crate::traits::row::MockRowBuilder;
769
770 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
771 let mut processor = HanaBatchProcessor::with_defaults(schema);
772
773 let row = MockRowBuilder::new().int(1).string("extra").build();
775
776 let result = processor.process_row_generic(&row);
777 assert!(result.is_err());
778 let err = result.unwrap_err();
779 assert!(err.is_schema_mismatch());
780 }
781
782 #[test]
783 fn test_process_row_generic_flush() {
784 use crate::traits::row::MockRowBuilder;
785
786 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
787 let config = BatchConfig::with_batch_size(100);
788 let mut processor = HanaBatchProcessor::new(schema, config);
789
790 for i in 0..5 {
792 let row = MockRowBuilder::new().int(i).build();
793 processor.process_row_generic(&row).unwrap();
794 }
795
796 assert_eq!(processor.buffered_rows(), 5);
797
798 let batch = processor
800 .flush()
801 .unwrap()
802 .expect("should have remaining rows");
803 assert_eq!(batch.num_rows(), 5);
804 assert_eq!(processor.buffered_rows(), 0);
805 }
806
807 #[test]
812 fn test_builder_reuse_after_finish() {
813 use crate::traits::row::MockRowBuilder;
814
815 let schema = Arc::new(Schema::new(vec![
816 Field::new("id", DataType::Int32, false),
817 Field::new("name", DataType::Utf8, true),
818 ]));
819 let config = BatchConfig::with_batch_size(2);
820 let mut processor = HanaBatchProcessor::new(schema, config);
821
822 for i in 0..2 {
824 let row = MockRowBuilder::new().int(i).string("test").build();
825 let result = processor.process_row_generic(&row).unwrap();
826 if i == 1 {
827 assert!(result.is_some(), "First batch should be ready");
828 }
829 }
830
831 for i in 2..4 {
833 let row = MockRowBuilder::new().int(i).string("test2").build();
834 let result = processor.process_row_generic(&row).unwrap();
835 if i == 3 {
836 let batch = result.expect("Second batch should be ready");
837 assert_eq!(batch.num_rows(), 2);
838 let id_array = batch
840 .column(0)
841 .as_any()
842 .downcast_ref::<arrow_array::Int32Array>()
843 .unwrap();
844 assert_eq!(id_array.value(0), 2);
845 assert_eq!(id_array.value(1), 3);
846 }
847 }
848 }
849
850 #[test]
855 fn test_processor_homogeneous_int64() {
856 use crate::traits::row::MockRowBuilder;
857
858 let schema = Arc::new(Schema::new(vec![
859 Field::new("col1", DataType::Int64, false),
860 Field::new("col2", DataType::Int64, false),
861 Field::new("col3", DataType::Int64, false),
862 ]));
863 let config = BatchConfig::with_batch_size(3);
864 let mut processor = HanaBatchProcessor::new(schema, config);
865
866 assert!(processor.profile().is_homogeneous());
867
868 processor
870 .process_row_generic(&MockRowBuilder::new().bigint(1).bigint(2).bigint(3).build())
871 .unwrap();
872 processor
873 .process_row_generic(&MockRowBuilder::new().bigint(4).bigint(5).bigint(6).build())
874 .unwrap();
875 let result = processor
876 .process_row_generic(&MockRowBuilder::new().bigint(7).bigint(8).bigint(9).build())
877 .unwrap();
878
879 let batch = result.expect("batch should be ready");
880 assert_eq!(batch.num_rows(), 3);
881 assert_eq!(batch.num_columns(), 3);
882 }
883
884 #[test]
885 fn test_processor_homogeneous_int32() {
886 use crate::traits::row::MockRowBuilder;
887
888 let schema = Arc::new(Schema::new(vec![
889 Field::new("col1", DataType::Int32, false),
890 Field::new("col2", DataType::Int32, false),
891 Field::new("col3", DataType::Int32, false),
892 ]));
893 let config = BatchConfig::with_batch_size(2);
894 let mut processor = HanaBatchProcessor::new(schema, config);
895
896 assert!(processor.profile().is_homogeneous());
897
898 processor
899 .process_row_generic(&MockRowBuilder::new().int(10).int(20).int(30).build())
900 .unwrap();
901 let result = processor
902 .process_row_generic(&MockRowBuilder::new().int(40).int(50).int(60).build())
903 .unwrap();
904
905 let batch = result.expect("batch should be ready");
906 assert_eq!(batch.num_rows(), 2);
907 assert_eq!(batch.num_columns(), 3);
908 }
909
910 #[test]
911 fn test_processor_homogeneous_float64() {
912 use crate::traits::row::MockRowBuilder;
913
914 let schema = Arc::new(Schema::new(vec![
915 Field::new("col1", DataType::Float64, false),
916 Field::new("col2", DataType::Float64, false),
917 ]));
918 let config = BatchConfig::with_batch_size(2);
919 let mut processor = HanaBatchProcessor::new(schema, config);
920
921 assert!(processor.profile().is_homogeneous());
922
923 processor
924 .process_row_generic(&MockRowBuilder::new().double(1.5).double(2.5).build())
925 .unwrap();
926 let result = processor
927 .process_row_generic(&MockRowBuilder::new().double(3.5).double(4.5).build())
928 .unwrap();
929
930 let batch = result.expect("batch should be ready");
931 assert_eq!(batch.num_rows(), 2);
932 assert_eq!(batch.num_columns(), 2);
933 }
934
935 #[test]
936 fn test_processor_homogeneous_decimal128() {
937 use crate::traits::row::MockRowBuilder;
938
939 let schema = Arc::new(Schema::new(vec![
940 Field::new("col1", DataType::Decimal128(18, 2), false),
941 Field::new("col2", DataType::Decimal128(18, 2), false),
942 ]));
943 let config = BatchConfig::with_batch_size(2);
944 let mut processor = HanaBatchProcessor::new(schema, config);
945
946 assert!(processor.profile().is_homogeneous());
947
948 processor
949 .process_row_generic(
950 &MockRowBuilder::new()
951 .decimal_str("100.50")
952 .decimal_str("200.75")
953 .build(),
954 )
955 .unwrap();
956 let result = processor
957 .process_row_generic(
958 &MockRowBuilder::new()
959 .decimal_str("300.25")
960 .decimal_str("400.99")
961 .build(),
962 )
963 .unwrap();
964
965 let batch = result.expect("batch should be ready");
966 assert_eq!(batch.num_rows(), 2);
967 assert_eq!(batch.num_columns(), 2);
968 }
969
970 #[test]
971 fn test_processor_homogeneous_utf8() {
972 use crate::traits::row::MockRowBuilder;
973
974 let schema = Arc::new(Schema::new(vec![
975 Field::new("col1", DataType::Utf8, true),
976 Field::new("col2", DataType::Utf8, true),
977 Field::new("col3", DataType::Utf8, true),
978 ]));
979 let config = BatchConfig::with_batch_size(2);
980 let mut processor = HanaBatchProcessor::new(schema, config);
981
982 assert!(processor.profile().is_homogeneous());
983
984 processor
985 .process_row_generic(
986 &MockRowBuilder::new()
987 .string("alice")
988 .string("bob")
989 .string("charlie")
990 .build(),
991 )
992 .unwrap();
993 let result = processor
994 .process_row_generic(
995 &MockRowBuilder::new()
996 .string("diana")
997 .string("eve")
998 .string("frank")
999 .build(),
1000 )
1001 .unwrap();
1002
1003 let batch = result.expect("batch should be ready");
1004 assert_eq!(batch.num_rows(), 2);
1005 assert_eq!(batch.num_columns(), 3);
1006 }
1007
1008 #[test]
1009 fn test_processor_mixed_schema() {
1010 use crate::traits::row::MockRowBuilder;
1011
1012 let schema = Arc::new(Schema::new(vec![
1013 Field::new("id", DataType::Int64, false),
1014 Field::new("name", DataType::Utf8, true),
1015 Field::new("active", DataType::Boolean, false),
1016 ]));
1017 let config = BatchConfig::with_batch_size(2);
1018 let mut processor = HanaBatchProcessor::new(schema, config);
1019
1020 assert!(!processor.profile().is_homogeneous());
1021
1022 processor
1024 .process_row_generic(
1025 &MockRowBuilder::new()
1026 .bigint(1)
1027 .string("Alice")
1028 .boolean(true)
1029 .build(),
1030 )
1031 .unwrap();
1032 let result = processor
1033 .process_row_generic(
1034 &MockRowBuilder::new()
1035 .bigint(2)
1036 .string("Bob")
1037 .boolean(false)
1038 .build(),
1039 )
1040 .unwrap();
1041
1042 let batch = result.expect("batch should be ready");
1043 assert_eq!(batch.num_rows(), 2);
1044 assert_eq!(batch.num_columns(), 3);
1045 }
1046
1047 #[test]
1048 fn test_processor_homogeneous_with_nulls() {
1049 use crate::traits::row::MockRowBuilder;
1050
1051 let schema = Arc::new(Schema::new(vec![
1052 Field::new("col1", DataType::Int64, true),
1053 Field::new("col2", DataType::Int64, true),
1054 ]));
1055 let config = BatchConfig::with_batch_size(2);
1056 let mut processor = HanaBatchProcessor::new(schema, config);
1057
1058 assert!(processor.profile().is_homogeneous());
1059
1060 processor
1061 .process_row_generic(&MockRowBuilder::new().bigint(1).null().build())
1062 .unwrap();
1063 let result = processor
1064 .process_row_generic(&MockRowBuilder::new().null().bigint(2).build())
1065 .unwrap();
1066
1067 let batch = result.expect("batch should be ready");
1068 assert_eq!(batch.num_rows(), 2);
1069 }
1070
1071 #[test]
1072 fn test_processor_homogeneous_int32_with_nulls() {
1073 use crate::traits::row::MockRowBuilder;
1074
1075 let schema = Arc::new(Schema::new(vec![
1076 Field::new("col1", DataType::Int32, true),
1077 Field::new("col2", DataType::Int32, true),
1078 Field::new("col3", DataType::Int32, true),
1079 ]));
1080 let config = BatchConfig::with_batch_size(3);
1081 let mut processor = HanaBatchProcessor::new(schema, config);
1082
1083 assert!(processor.profile().is_homogeneous());
1084
1085 processor
1086 .process_row_generic(&MockRowBuilder::new().int(1).null().int(3).build())
1087 .unwrap();
1088 processor
1089 .process_row_generic(&MockRowBuilder::new().null().int(5).null().build())
1090 .unwrap();
1091 let result = processor
1092 .process_row_generic(&MockRowBuilder::new().int(7).int(8).null().build())
1093 .unwrap();
1094
1095 let batch = result.expect("batch should be ready");
1096 assert_eq!(batch.num_rows(), 3);
1097 assert_eq!(batch.num_columns(), 3);
1098 }
1099
1100 #[test]
1101 fn test_processor_homogeneous_float64_with_nulls() {
1102 use crate::traits::row::MockRowBuilder;
1103
1104 let schema = Arc::new(Schema::new(vec![
1105 Field::new("col1", DataType::Float64, true),
1106 Field::new("col2", DataType::Float64, true),
1107 ]));
1108 let config = BatchConfig::with_batch_size(2);
1109 let mut processor = HanaBatchProcessor::new(schema, config);
1110
1111 assert!(processor.profile().is_homogeneous());
1112
1113 processor
1114 .process_row_generic(&MockRowBuilder::new().double(1.5).null().build())
1115 .unwrap();
1116 let result = processor
1117 .process_row_generic(&MockRowBuilder::new().null().double(3.5).build())
1118 .unwrap();
1119
1120 let batch = result.expect("batch should be ready");
1121 assert_eq!(batch.num_rows(), 2);
1122 }
1123
1124 #[test]
1125 fn test_processor_homogeneous_decimal128_with_nulls() {
1126 use crate::traits::row::MockRowBuilder;
1127
1128 let schema = Arc::new(Schema::new(vec![
1129 Field::new("col1", DataType::Decimal128(18, 2), true),
1130 Field::new("col2", DataType::Decimal128(18, 2), true),
1131 ]));
1132 let config = BatchConfig::with_batch_size(2);
1133 let mut processor = HanaBatchProcessor::new(schema, config);
1134
1135 assert!(processor.profile().is_homogeneous());
1136
1137 processor
1138 .process_row_generic(&MockRowBuilder::new().decimal_str("100.50").null().build())
1139 .unwrap();
1140 let result = processor
1141 .process_row_generic(&MockRowBuilder::new().null().decimal_str("400.99").build())
1142 .unwrap();
1143
1144 let batch = result.expect("batch should be ready");
1145 assert_eq!(batch.num_rows(), 2);
1146 }
1147
1148 #[test]
1149 fn test_processor_homogeneous_utf8_with_nulls() {
1150 use crate::traits::row::MockRowBuilder;
1151
1152 let schema = Arc::new(Schema::new(vec![
1153 Field::new("col1", DataType::Utf8, true),
1154 Field::new("col2", DataType::Utf8, true),
1155 ]));
1156 let config = BatchConfig::with_batch_size(2);
1157 let mut processor = HanaBatchProcessor::new(schema, config);
1158
1159 assert!(processor.profile().is_homogeneous());
1160
1161 processor
1162 .process_row_generic(&MockRowBuilder::new().string("hello").null().build())
1163 .unwrap();
1164 let result = processor
1165 .process_row_generic(&MockRowBuilder::new().null().string("world").build())
1166 .unwrap();
1167
1168 let batch = result.expect("batch should be ready");
1169 assert_eq!(batch.num_rows(), 2);
1170 }
1171
1172 #[test]
1173 fn test_processor_homogeneous_wide_schema() {
1174 use crate::traits::row::MockRowBuilder;
1175
1176 let mut fields = vec![];
1177 for i in 0..100 {
1178 fields.push(Field::new(&format!("col{}", i), DataType::Int64, false));
1179 }
1180
1181 let schema = Arc::new(Schema::new(fields));
1182 let config = BatchConfig::with_batch_size(1);
1183 let mut processor = HanaBatchProcessor::new(schema, config);
1184
1185 assert!(processor.profile().is_homogeneous());
1186
1187 let mut row_builder = MockRowBuilder::new();
1188 for i in 0..100 {
1189 row_builder = row_builder.bigint(i as i64);
1190 }
1191
1192 let result = processor.process_row_generic(&row_builder.build()).unwrap();
1193
1194 let batch = result.expect("batch should be ready");
1195 assert_eq!(batch.num_rows(), 1);
1196 assert_eq!(batch.num_columns(), 100);
1197 }
1198
1199 #[test]
1200 fn test_processor_homogeneous_unsupported_type_fallback() {
1201 let schema = Arc::new(Schema::new(vec![
1202 Field::new("col1", DataType::Boolean, false),
1203 Field::new("col2", DataType::Boolean, false),
1204 ]));
1205 let config = BatchConfig::with_batch_size(2);
1206 let processor = HanaBatchProcessor::new(schema, config);
1207
1208 assert!(processor.profile().is_homogeneous());
1209 }
1210
1211 #[test]
1212 fn test_processor_multiple_batches_homogeneous() {
1213 use crate::traits::row::MockRowBuilder;
1214
1215 let schema = Arc::new(Schema::new(vec![
1216 Field::new("col1", DataType::Int64, false),
1217 Field::new("col2", DataType::Int64, false),
1218 ]));
1219 let config = BatchConfig::with_batch_size(2);
1220 let mut processor = HanaBatchProcessor::new(schema, config);
1221
1222 assert!(processor.profile().is_homogeneous());
1223
1224 processor
1226 .process_row_generic(&MockRowBuilder::new().bigint(1).bigint(2).build())
1227 .unwrap();
1228 let batch1 = processor
1229 .process_row_generic(&MockRowBuilder::new().bigint(3).bigint(4).build())
1230 .unwrap();
1231 assert!(batch1.is_some());
1232
1233 processor
1235 .process_row_generic(&MockRowBuilder::new().bigint(5).bigint(6).build())
1236 .unwrap();
1237 let batch2 = processor
1238 .process_row_generic(&MockRowBuilder::new().bigint(7).bigint(8).build())
1239 .unwrap();
1240 assert!(batch2.is_some());
1241
1242 assert_eq!(processor.buffered_rows(), 0);
1244 }
1245
1246 #[test]
1247 fn test_processor_homogeneous_int32_wide() {
1248 use crate::traits::row::MockRowBuilder;
1249
1250 let mut fields = vec![];
1251 for i in 0..50 {
1252 fields.push(Field::new(&format!("col{}", i), DataType::Int32, false));
1253 }
1254
1255 let schema = Arc::new(Schema::new(fields));
1256 let config = BatchConfig::with_batch_size(1);
1257 let mut processor = HanaBatchProcessor::new(schema, config);
1258
1259 assert!(processor.profile().is_homogeneous());
1260
1261 let mut row_builder = MockRowBuilder::new();
1262 for i in 0..50 {
1263 row_builder = row_builder.int(i as i32);
1264 }
1265
1266 let result = processor.process_row_generic(&row_builder.build()).unwrap();
1267
1268 let batch = result.expect("batch should be ready");
1269 assert_eq!(batch.num_rows(), 1);
1270 assert_eq!(batch.num_columns(), 50);
1271 }
1272
1273 #[test]
1274 fn test_processor_homogeneous_float64_wide() {
1275 use crate::traits::row::MockRowBuilder;
1276
1277 let mut fields = vec![];
1278 for i in 0..30 {
1279 fields.push(Field::new(&format!("col{}", i), DataType::Float64, false));
1280 }
1281
1282 let schema = Arc::new(Schema::new(fields));
1283 let config = BatchConfig::with_batch_size(1);
1284 let mut processor = HanaBatchProcessor::new(schema, config);
1285
1286 assert!(processor.profile().is_homogeneous());
1287
1288 let mut row_builder = MockRowBuilder::new();
1289 for i in 0..30 {
1290 row_builder = row_builder.double(i as f64 * 1.5);
1291 }
1292
1293 let result = processor.process_row_generic(&row_builder.build()).unwrap();
1294
1295 let batch = result.expect("batch should be ready");
1296 assert_eq!(batch.num_rows(), 1);
1297 assert_eq!(batch.num_columns(), 30);
1298 }
1299
1300 #[test]
1301 fn test_processor_homogeneous_utf8_wide() {
1302 use crate::traits::row::MockRowBuilder;
1303
1304 let mut fields = vec![];
1305 for i in 0..20 {
1306 fields.push(Field::new(&format!("col{}", i), DataType::Utf8, true));
1307 }
1308
1309 let schema = Arc::new(Schema::new(fields));
1310 let config = BatchConfig::with_batch_size(1);
1311 let mut processor = HanaBatchProcessor::new(schema, config);
1312
1313 assert!(processor.profile().is_homogeneous());
1314
1315 let mut row_builder = MockRowBuilder::new();
1316 for i in 0..20 {
1317 row_builder = row_builder.string(&format!("value{}", i));
1318 }
1319
1320 let result = processor.process_row_generic(&row_builder.build()).unwrap();
1321
1322 let batch = result.expect("batch should be ready");
1323 assert_eq!(batch.num_rows(), 1);
1324 assert_eq!(batch.num_columns(), 20);
1325 }
1326}