1use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9
10use crate::Result;
11use crate::builders::factory::BuilderFactory;
12use crate::traits::builder::HanaCompatibleBuilder;
13use crate::traits::row::RowLike;
14use crate::traits::streaming::BatchConfig;
15
16pub struct HanaBatchProcessor {
43 schema: SchemaRef,
44 config: BatchConfig,
45 builders: Vec<Box<dyn HanaCompatibleBuilder>>,
46 row_count: usize,
47}
48
49impl std::fmt::Debug for HanaBatchProcessor {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("HanaBatchProcessor")
52 .field("schema", &self.schema)
53 .field("config", &self.config)
54 .field("builders", &format!("[{} builders]", self.builders.len()))
55 .field("row_count", &self.row_count)
56 .finish()
57 }
58}
59
60impl HanaBatchProcessor {
61 #[must_use]
68 pub fn new(schema: SchemaRef, config: BatchConfig) -> Self {
69 let factory = BuilderFactory::from_config(&config);
70 let builders = factory.create_builders_for_schema(&schema);
71
72 Self {
73 schema,
74 config,
75 builders,
76 row_count: 0,
77 }
78 }
79
80 #[must_use]
82 pub fn with_defaults(schema: SchemaRef) -> Self {
83 Self::new(schema, BatchConfig::default())
84 }
85
86 pub fn process_row(&mut self, row: &hdbconnect::Row) -> Result<Option<RecordBatch>> {
95 self.process_row_generic(row)
96 }
97
98 pub fn process_row_generic<R: RowLike>(&mut self, row: &R) -> Result<Option<RecordBatch>> {
119 if row.len() != self.builders.len() {
121 return Err(crate::ArrowConversionError::schema_mismatch(
122 self.builders.len(),
123 row.len(),
124 ));
125 }
126
127 for (i, builder) in self.builders.iter_mut().enumerate() {
129 let value = row.get(i);
130
131 match value {
132 hdbconnect::HdbValue::NULL => builder.append_null(),
133 v => builder.append_hana_value(v)?,
134 }
135 }
136
137 self.row_count += 1;
138
139 if self.row_count >= self.config.batch_size.get() {
141 return Ok(Some(self.finish_current_batch()?));
142 }
143
144 Ok(None)
145 }
146
147 pub fn flush(&mut self) -> Result<Option<RecordBatch>> {
153 if self.row_count == 0 {
154 return Ok(None);
155 }
156
157 Ok(Some(self.finish_current_batch()?))
158 }
159
160 #[must_use]
162 pub fn schema(&self) -> SchemaRef {
163 Arc::clone(&self.schema)
164 }
165
166 #[must_use]
168 pub const fn buffered_rows(&self) -> usize {
169 self.row_count
170 }
171
172 fn finish_current_batch(&mut self) -> Result<RecordBatch> {
182 let arrays: Vec<_> = self.builders.iter_mut().map(|b| b.finish()).collect();
185
186 let batch = RecordBatch::try_new(Arc::clone(&self.schema), arrays)
188 .map_err(|e| crate::ArrowConversionError::value_conversion("batch", e.to_string()))?;
189
190 self.row_count = 0;
192
193 Ok(batch)
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use arrow_schema::{DataType, Field, Schema};
200
201 use super::*;
202
203 #[test]
208 fn test_processor_creation() {
209 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
210 let config = BatchConfig::with_batch_size(100);
211
212 let processor = HanaBatchProcessor::new(schema, config);
213 assert_eq!(processor.buffered_rows(), 0);
214 }
215
216 #[test]
217 fn test_processor_with_defaults() {
218 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
219 let processor = HanaBatchProcessor::with_defaults(schema);
220 assert_eq!(processor.buffered_rows(), 0);
221 }
222
223 #[test]
224 fn test_processor_schema() {
225 let schema = Arc::new(Schema::new(vec![
226 Field::new("id", DataType::Int32, false),
227 Field::new("name", DataType::Utf8, true),
228 ]));
229 let processor = HanaBatchProcessor::with_defaults(Arc::clone(&schema));
230
231 let returned_schema = processor.schema();
232 assert_eq!(returned_schema.fields().len(), 2);
233 assert_eq!(returned_schema.field(0).name(), "id");
234 assert_eq!(returned_schema.field(1).name(), "name");
235 }
236
237 #[test]
238 fn test_processor_initial_buffered_rows() {
239 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
240 let processor = HanaBatchProcessor::with_defaults(schema);
241 assert_eq!(processor.buffered_rows(), 0);
242 }
243
244 #[test]
249 fn test_processor_with_small_batch_size() {
250 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
251 let config = BatchConfig::with_batch_size(10);
252 let processor = HanaBatchProcessor::new(schema, config);
253 assert_eq!(processor.buffered_rows(), 0);
254 }
255
256 #[test]
257 fn test_processor_with_large_batch_size() {
258 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
259 let config = BatchConfig::with_batch_size(100000);
260 let processor = HanaBatchProcessor::new(schema, config);
261 assert_eq!(processor.buffered_rows(), 0);
262 }
263
264 #[test]
265 fn test_processor_with_custom_config() {
266 let schema = Arc::new(Schema::new(vec![Field::new("data", DataType::Utf8, true)]));
267 let config = BatchConfig::with_batch_size(500)
268 .string_capacity(10000)
269 .binary_capacity(5000);
270 let processor = HanaBatchProcessor::new(schema, config);
271 assert_eq!(processor.buffered_rows(), 0);
272 }
273
274 #[test]
279 fn test_processor_with_empty_schema() {
280 let fields: Vec<Field> = vec![];
281 let schema = Arc::new(Schema::new(fields));
282 let processor = HanaBatchProcessor::with_defaults(schema);
283 assert_eq!(processor.buffered_rows(), 0);
284 }
285
286 #[test]
287 fn test_processor_with_single_column_schema() {
288 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
289 let processor = HanaBatchProcessor::with_defaults(schema);
290 assert_eq!(processor.buffered_rows(), 0);
291 }
292
293 #[test]
294 fn test_processor_with_multi_column_schema() {
295 let schema = Arc::new(Schema::new(vec![
296 Field::new("id", DataType::Int64, false),
297 Field::new("name", DataType::Utf8, true),
298 Field::new("price", DataType::Decimal128(18, 2), false),
299 Field::new("is_active", DataType::Boolean, false),
300 ]));
301 let processor = HanaBatchProcessor::with_defaults(schema);
302 assert_eq!(processor.buffered_rows(), 0);
303 }
304
305 #[test]
306 fn test_processor_with_all_numeric_types() {
307 let schema = Arc::new(Schema::new(vec![
308 Field::new("tiny", DataType::UInt8, false),
309 Field::new("small", DataType::Int16, false),
310 Field::new("int", DataType::Int32, false),
311 Field::new("big", DataType::Int64, false),
312 Field::new("real", DataType::Float32, false),
313 Field::new("double", DataType::Float64, false),
314 ]));
315 let processor = HanaBatchProcessor::with_defaults(schema);
316 assert_eq!(processor.buffered_rows(), 0);
317 }
318
319 #[test]
320 fn test_processor_with_string_types() {
321 let schema = Arc::new(Schema::new(vec![
322 Field::new("small_str", DataType::Utf8, true),
323 Field::new("large_str", DataType::LargeUtf8, true),
324 ]));
325 let processor = HanaBatchProcessor::with_defaults(schema);
326 assert_eq!(processor.buffered_rows(), 0);
327 }
328
329 #[test]
330 fn test_processor_with_binary_types() {
331 let schema = Arc::new(Schema::new(vec![
332 Field::new("bin", DataType::Binary, true),
333 Field::new("large_bin", DataType::LargeBinary, true),
334 Field::new("fixed_bin", DataType::FixedSizeBinary(16), true),
335 ]));
336 let processor = HanaBatchProcessor::with_defaults(schema);
337 assert_eq!(processor.buffered_rows(), 0);
338 }
339
340 #[test]
345 fn test_processor_flush_empty() {
346 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
347 let mut processor = HanaBatchProcessor::with_defaults(schema);
348
349 let result = processor.flush();
350 assert!(result.is_ok());
351 assert!(result.unwrap().is_none());
352 }
353
354 #[test]
355 fn test_processor_flush_multiple_times_when_empty() {
356 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
357 let mut processor = HanaBatchProcessor::with_defaults(schema);
358
359 assert!(processor.flush().unwrap().is_none());
360 assert!(processor.flush().unwrap().is_none());
361 assert!(processor.flush().unwrap().is_none());
362 }
363
364 #[test]
369 fn test_processor_debug() {
370 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
371 let processor = HanaBatchProcessor::with_defaults(schema);
372
373 let debug_str = format!("{:?}", processor);
374 assert!(debug_str.contains("HanaBatchProcessor"));
375 assert!(debug_str.contains("row_count"));
376 assert!(debug_str.contains("builders"));
377 }
378
379 #[test]
384 fn test_processor_schema_returns_same_schema() {
385 let original_schema = Arc::new(Schema::new(vec![
386 Field::new("id", DataType::Int32, false),
387 Field::new("value", DataType::Float64, true),
388 ]));
389 let processor = HanaBatchProcessor::with_defaults(Arc::clone(&original_schema));
390
391 let schema1 = processor.schema();
392 let schema2 = processor.schema();
393
394 assert!(Arc::ptr_eq(&schema1, &schema2));
395 }
396
397 #[test]
402 fn test_process_row_generic_with_mock_row() {
403 use crate::traits::row::MockRowBuilder;
404
405 let schema = Arc::new(Schema::new(vec![
406 Field::new("id", DataType::Int32, false),
407 Field::new("name", DataType::Utf8, true),
408 ]));
409 let config = BatchConfig::with_batch_size(10);
410 let mut processor = HanaBatchProcessor::new(schema, config);
411
412 let row = MockRowBuilder::new().int(42).string("test").build();
413
414 let result = processor.process_row_generic(&row);
415 assert!(result.is_ok());
416 assert!(result.unwrap().is_none()); assert_eq!(processor.buffered_rows(), 1);
418 }
419
420 #[test]
421 fn test_process_row_generic_batch_ready() {
422 use crate::traits::row::MockRowBuilder;
423
424 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
425 let config = BatchConfig::with_batch_size(3);
426 let mut processor = HanaBatchProcessor::new(schema, config);
427
428 for i in 0..3 {
430 let row = MockRowBuilder::new().int(i).build();
431 let result = processor.process_row_generic(&row).unwrap();
432 if i < 2 {
433 assert!(result.is_none());
434 } else {
435 let batch = result.expect("batch should be ready");
437 assert_eq!(batch.num_rows(), 3);
438 }
439 }
440 }
441
442 #[test]
443 fn test_process_row_generic_with_nulls() {
444 use crate::traits::row::MockRowBuilder;
445
446 let schema = Arc::new(Schema::new(vec![
447 Field::new("id", DataType::Int32, true),
448 Field::new("name", DataType::Utf8, true),
449 ]));
450 let config = BatchConfig::with_batch_size(2);
451 let mut processor = HanaBatchProcessor::new(schema, config);
452
453 let row = MockRowBuilder::new().null().null().build();
455
456 let result = processor.process_row_generic(&row);
457 assert!(result.is_ok());
458 assert_eq!(processor.buffered_rows(), 1);
459 }
460
461 #[test]
462 fn test_process_row_generic_schema_mismatch() {
463 use crate::traits::row::MockRowBuilder;
464
465 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
466 let mut processor = HanaBatchProcessor::with_defaults(schema);
467
468 let row = MockRowBuilder::new().int(1).string("extra").build();
470
471 let result = processor.process_row_generic(&row);
472 assert!(result.is_err());
473 let err = result.unwrap_err();
474 assert!(err.is_schema_mismatch());
475 }
476
477 #[test]
478 fn test_process_row_generic_flush() {
479 use crate::traits::row::MockRowBuilder;
480
481 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
482 let config = BatchConfig::with_batch_size(100);
483 let mut processor = HanaBatchProcessor::new(schema, config);
484
485 for i in 0..5 {
487 let row = MockRowBuilder::new().int(i).build();
488 processor.process_row_generic(&row).unwrap();
489 }
490
491 assert_eq!(processor.buffered_rows(), 5);
492
493 let batch = processor
495 .flush()
496 .unwrap()
497 .expect("should have remaining rows");
498 assert_eq!(batch.num_rows(), 5);
499 assert_eq!(processor.buffered_rows(), 0);
500 }
501
502 #[test]
507 fn test_builder_reuse_after_finish() {
508 use crate::traits::row::MockRowBuilder;
509
510 let schema = Arc::new(Schema::new(vec![
511 Field::new("id", DataType::Int32, false),
512 Field::new("name", DataType::Utf8, true),
513 ]));
514 let config = BatchConfig::with_batch_size(2);
515 let mut processor = HanaBatchProcessor::new(schema, config);
516
517 for i in 0..2 {
519 let row = MockRowBuilder::new().int(i).string("test").build();
520 let result = processor.process_row_generic(&row).unwrap();
521 if i == 1 {
522 assert!(result.is_some(), "First batch should be ready");
523 }
524 }
525
526 for i in 2..4 {
528 let row = MockRowBuilder::new().int(i).string("test2").build();
529 let result = processor.process_row_generic(&row).unwrap();
530 if i == 3 {
531 let batch = result.expect("Second batch should be ready");
532 assert_eq!(batch.num_rows(), 2);
533 let id_array = batch
535 .column(0)
536 .as_any()
537 .downcast_ref::<arrow_array::Int32Array>()
538 .unwrap();
539 assert_eq!(id_array.value(0), 2);
540 assert_eq!(id_array.value(1), 3);
541 }
542 }
543 }
544}