Skip to main content

hdbconnect_arrow/conversion/
processor.rs

1//! Batch processor for streaming conversion of HANA rows to `RecordBatch`es.
2//!
3//! Implements buffered batch creation with configurable batch size.
4
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9
10use crate::Result;
11use crate::builders::factory::BuilderFactory;
12use crate::traits::builder::HanaCompatibleBuilder;
13use crate::traits::row::RowLike;
14use crate::traits::streaming::BatchConfig;
15
16/// Processor that converts HANA rows into Arrow `RecordBatch`es.
17///
18/// Buffers rows until `batch_size` is reached, then emits a `RecordBatch`.
19/// Implements the `BatchProcessor` trait with GAT support.
20///
21/// # Example
22///
23/// ```rust,ignore
24/// use hdbconnect_arrow::conversion::HanaBatchProcessor;
25/// use hdbconnect_arrow::traits::streaming::BatchConfig;
26///
27/// let schema = /* Arrow schema */;
28/// let config = BatchConfig::with_batch_size(10000);
29/// let mut processor = HanaBatchProcessor::new(Arc::new(schema), config);
30///
31/// for row in result_set {
32///     if let Some(batch) = processor.process_row(row)? {
33///         // Process batch
34///     }
35/// }
36///
37/// // Don't forget to flush remaining rows
38/// if let Some(batch) = processor.flush()? {
39///     // Process final batch
40/// }
41/// ```
42pub struct HanaBatchProcessor {
43    schema: SchemaRef,
44    config: BatchConfig,
45    builders: Vec<Box<dyn HanaCompatibleBuilder>>,
46    row_count: usize,
47}
48
49impl std::fmt::Debug for HanaBatchProcessor {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("HanaBatchProcessor")
52            .field("schema", &self.schema)
53            .field("config", &self.config)
54            .field("builders", &format!("[{} builders]", self.builders.len()))
55            .field("row_count", &self.row_count)
56            .finish()
57    }
58}
59
60impl HanaBatchProcessor {
61    /// Create a new batch processor.
62    ///
63    /// # Arguments
64    ///
65    /// * `schema` - Arrow schema for the batches
66    /// * `config` - Batch processing configuration
67    #[must_use]
68    pub fn new(schema: SchemaRef, config: BatchConfig) -> Self {
69        let factory = BuilderFactory::from_config(&config);
70        let builders = factory.create_builders_for_schema(&schema);
71
72        Self {
73            schema,
74            config,
75            builders,
76            row_count: 0,
77        }
78    }
79
80    /// Create with default configuration.
81    #[must_use]
82    pub fn with_defaults(schema: SchemaRef) -> Self {
83        Self::new(schema, BatchConfig::default())
84    }
85
86    /// Process a single row.
87    ///
88    /// Returns `Ok(Some(batch))` when a batch is ready, `Ok(None)` when more
89    /// rows are needed to fill a batch.
90    ///
91    /// # Errors
92    ///
93    /// Returns error if value conversion fails or schema mismatches.
94    pub fn process_row(&mut self, row: &hdbconnect::Row) -> Result<Option<RecordBatch>> {
95        self.process_row_generic(row)
96    }
97
98    /// Process a single row using the generic `RowLike` trait.
99    ///
100    /// This method enables unit testing with `MockRow` instead of requiring
101    /// a HANA connection.
102    ///
103    /// Returns `Ok(Some(batch))` when a batch is ready, `Ok(None)` when more
104    /// rows are needed to fill a batch.
105    ///
106    /// # Errors
107    ///
108    /// Returns error if value conversion fails or schema mismatches.
109    ///
110    /// # Example
111    ///
112    /// ```rust,ignore
113    /// use hdbconnect_arrow::traits::row::{MockRow, MockRowBuilder};
114    ///
115    /// let row = MockRowBuilder::new().int(42).string("test").build();
116    /// let result = processor.process_row_generic(&row)?;
117    /// ```
118    pub fn process_row_generic<R: RowLike>(&mut self, row: &R) -> Result<Option<RecordBatch>> {
119        // Validate column count
120        if row.len() != self.builders.len() {
121            return Err(crate::ArrowConversionError::schema_mismatch(
122                self.builders.len(),
123                row.len(),
124            ));
125        }
126
127        // Append row to builders
128        for (i, builder) in self.builders.iter_mut().enumerate() {
129            let value = row.get(i);
130
131            match value {
132                hdbconnect::HdbValue::NULL => builder.append_null(),
133                v => builder.append_hana_value(v)?,
134            }
135        }
136
137        self.row_count += 1;
138
139        // Check if we've reached batch size
140        if self.row_count >= self.config.batch_size.get() {
141            return Ok(Some(self.finish_current_batch()?));
142        }
143
144        Ok(None)
145    }
146
147    /// Flush any remaining rows as a final batch.
148    ///
149    /// # Errors
150    ///
151    /// Returns error if `RecordBatch` creation fails.
152    pub fn flush(&mut self) -> Result<Option<RecordBatch>> {
153        if self.row_count == 0 {
154            return Ok(None);
155        }
156
157        Ok(Some(self.finish_current_batch()?))
158    }
159
160    /// Returns the schema of batches produced by this processor.
161    #[must_use]
162    pub fn schema(&self) -> SchemaRef {
163        Arc::clone(&self.schema)
164    }
165
166    /// Returns the current row count in the buffer.
167    #[must_use]
168    pub const fn buffered_rows(&self) -> usize {
169        self.row_count
170    }
171
172    /// Finish the current batch and reset builders.
173    ///
174    /// Arrow builders reset their internal state after `finish()`, keeping
175    /// allocated capacity for the next batch. This avoids heap allocations
176    /// at batch boundaries.
177    ///
178    /// # Errors
179    ///
180    /// Returns error if `RecordBatch` creation fails.
181    fn finish_current_batch(&mut self) -> Result<RecordBatch> {
182        // Finish all builders to get arrays.
183        // Note: Arrow builders reset after finish() and retain capacity.
184        let arrays: Vec<_> = self.builders.iter_mut().map(|b| b.finish()).collect();
185
186        // Create RecordBatch
187        let batch = RecordBatch::try_new(Arc::clone(&self.schema), arrays)
188            .map_err(|e| crate::ArrowConversionError::value_conversion("batch", e.to_string()))?;
189
190        // Arrow builders are already reset after finish() - just reset row count
191        self.row_count = 0;
192
193        Ok(batch)
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use arrow_schema::{DataType, Field, Schema};
200
201    use super::*;
202
203    // ═══════════════════════════════════════════════════════════════════════════
204    // Processor Creation Tests
205    // ═══════════════════════════════════════════════════════════════════════════
206
207    #[test]
208    fn test_processor_creation() {
209        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
210        let config = BatchConfig::with_batch_size(100);
211
212        let processor = HanaBatchProcessor::new(schema, config);
213        assert_eq!(processor.buffered_rows(), 0);
214    }
215
216    #[test]
217    fn test_processor_with_defaults() {
218        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
219        let processor = HanaBatchProcessor::with_defaults(schema);
220        assert_eq!(processor.buffered_rows(), 0);
221    }
222
223    #[test]
224    fn test_processor_schema() {
225        let schema = Arc::new(Schema::new(vec![
226            Field::new("id", DataType::Int32, false),
227            Field::new("name", DataType::Utf8, true),
228        ]));
229        let processor = HanaBatchProcessor::with_defaults(Arc::clone(&schema));
230
231        let returned_schema = processor.schema();
232        assert_eq!(returned_schema.fields().len(), 2);
233        assert_eq!(returned_schema.field(0).name(), "id");
234        assert_eq!(returned_schema.field(1).name(), "name");
235    }
236
237    #[test]
238    fn test_processor_initial_buffered_rows() {
239        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
240        let processor = HanaBatchProcessor::with_defaults(schema);
241        assert_eq!(processor.buffered_rows(), 0);
242    }
243
244    // ═══════════════════════════════════════════════════════════════════════════
245    // Processor with Different Configs
246    // ═══════════════════════════════════════════════════════════════════════════
247
248    #[test]
249    fn test_processor_with_small_batch_size() {
250        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
251        let config = BatchConfig::with_batch_size(10);
252        let processor = HanaBatchProcessor::new(schema, config);
253        assert_eq!(processor.buffered_rows(), 0);
254    }
255
256    #[test]
257    fn test_processor_with_large_batch_size() {
258        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
259        let config = BatchConfig::with_batch_size(100000);
260        let processor = HanaBatchProcessor::new(schema, config);
261        assert_eq!(processor.buffered_rows(), 0);
262    }
263
264    #[test]
265    fn test_processor_with_custom_config() {
266        let schema = Arc::new(Schema::new(vec![Field::new("data", DataType::Utf8, true)]));
267        let config = BatchConfig::with_batch_size(500)
268            .string_capacity(10000)
269            .binary_capacity(5000);
270        let processor = HanaBatchProcessor::new(schema, config);
271        assert_eq!(processor.buffered_rows(), 0);
272    }
273
274    // ═══════════════════════════════════════════════════════════════════════════
275    // Processor with Different Schema Types
276    // ═══════════════════════════════════════════════════════════════════════════
277
278    #[test]
279    fn test_processor_with_empty_schema() {
280        let fields: Vec<Field> = vec![];
281        let schema = Arc::new(Schema::new(fields));
282        let processor = HanaBatchProcessor::with_defaults(schema);
283        assert_eq!(processor.buffered_rows(), 0);
284    }
285
286    #[test]
287    fn test_processor_with_single_column_schema() {
288        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
289        let processor = HanaBatchProcessor::with_defaults(schema);
290        assert_eq!(processor.buffered_rows(), 0);
291    }
292
293    #[test]
294    fn test_processor_with_multi_column_schema() {
295        let schema = Arc::new(Schema::new(vec![
296            Field::new("id", DataType::Int64, false),
297            Field::new("name", DataType::Utf8, true),
298            Field::new("price", DataType::Decimal128(18, 2), false),
299            Field::new("is_active", DataType::Boolean, false),
300        ]));
301        let processor = HanaBatchProcessor::with_defaults(schema);
302        assert_eq!(processor.buffered_rows(), 0);
303    }
304
305    #[test]
306    fn test_processor_with_all_numeric_types() {
307        let schema = Arc::new(Schema::new(vec![
308            Field::new("tiny", DataType::UInt8, false),
309            Field::new("small", DataType::Int16, false),
310            Field::new("int", DataType::Int32, false),
311            Field::new("big", DataType::Int64, false),
312            Field::new("real", DataType::Float32, false),
313            Field::new("double", DataType::Float64, false),
314        ]));
315        let processor = HanaBatchProcessor::with_defaults(schema);
316        assert_eq!(processor.buffered_rows(), 0);
317    }
318
319    #[test]
320    fn test_processor_with_string_types() {
321        let schema = Arc::new(Schema::new(vec![
322            Field::new("small_str", DataType::Utf8, true),
323            Field::new("large_str", DataType::LargeUtf8, true),
324        ]));
325        let processor = HanaBatchProcessor::with_defaults(schema);
326        assert_eq!(processor.buffered_rows(), 0);
327    }
328
329    #[test]
330    fn test_processor_with_binary_types() {
331        let schema = Arc::new(Schema::new(vec![
332            Field::new("bin", DataType::Binary, true),
333            Field::new("large_bin", DataType::LargeBinary, true),
334            Field::new("fixed_bin", DataType::FixedSizeBinary(16), true),
335        ]));
336        let processor = HanaBatchProcessor::with_defaults(schema);
337        assert_eq!(processor.buffered_rows(), 0);
338    }
339
340    // ═══════════════════════════════════════════════════════════════════════════
341    // Flush Tests (without rows - tests empty flush)
342    // ═══════════════════════════════════════════════════════════════════════════
343
344    #[test]
345    fn test_processor_flush_empty() {
346        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
347        let mut processor = HanaBatchProcessor::with_defaults(schema);
348
349        let result = processor.flush();
350        assert!(result.is_ok());
351        assert!(result.unwrap().is_none());
352    }
353
354    #[test]
355    fn test_processor_flush_multiple_times_when_empty() {
356        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
357        let mut processor = HanaBatchProcessor::with_defaults(schema);
358
359        assert!(processor.flush().unwrap().is_none());
360        assert!(processor.flush().unwrap().is_none());
361        assert!(processor.flush().unwrap().is_none());
362    }
363
364    // ═══════════════════════════════════════════════════════════════════════════
365    // Debug Tests
366    // ═══════════════════════════════════════════════════════════════════════════
367
368    #[test]
369    fn test_processor_debug() {
370        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
371        let processor = HanaBatchProcessor::with_defaults(schema);
372
373        let debug_str = format!("{:?}", processor);
374        assert!(debug_str.contains("HanaBatchProcessor"));
375        assert!(debug_str.contains("row_count"));
376        assert!(debug_str.contains("builders"));
377    }
378
379    // ═══════════════════════════════════════════════════════════════════════════
380    // Schema Ref Tests
381    // ═══════════════════════════════════════════════════════════════════════════
382
383    #[test]
384    fn test_processor_schema_returns_same_schema() {
385        let original_schema = Arc::new(Schema::new(vec![
386            Field::new("id", DataType::Int32, false),
387            Field::new("value", DataType::Float64, true),
388        ]));
389        let processor = HanaBatchProcessor::with_defaults(Arc::clone(&original_schema));
390
391        let schema1 = processor.schema();
392        let schema2 = processor.schema();
393
394        assert!(Arc::ptr_eq(&schema1, &schema2));
395    }
396
397    // ═══════════════════════════════════════════════════════════════════════════
398    // MockRow Tests (unit testing without HANA connection)
399    // ═══════════════════════════════════════════════════════════════════════════
400
401    #[test]
402    fn test_process_row_generic_with_mock_row() {
403        use crate::traits::row::MockRowBuilder;
404
405        let schema = Arc::new(Schema::new(vec![
406            Field::new("id", DataType::Int32, false),
407            Field::new("name", DataType::Utf8, true),
408        ]));
409        let config = BatchConfig::with_batch_size(10);
410        let mut processor = HanaBatchProcessor::new(schema, config);
411
412        let row = MockRowBuilder::new().int(42).string("test").build();
413
414        let result = processor.process_row_generic(&row);
415        assert!(result.is_ok());
416        assert!(result.unwrap().is_none()); // Not enough rows for batch
417        assert_eq!(processor.buffered_rows(), 1);
418    }
419
420    #[test]
421    fn test_process_row_generic_batch_ready() {
422        use crate::traits::row::MockRowBuilder;
423
424        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
425        let config = BatchConfig::with_batch_size(3);
426        let mut processor = HanaBatchProcessor::new(schema, config);
427
428        // Add rows until batch is ready
429        for i in 0..3 {
430            let row = MockRowBuilder::new().int(i).build();
431            let result = processor.process_row_generic(&row).unwrap();
432            if i < 2 {
433                assert!(result.is_none());
434            } else {
435                // Third row should trigger batch
436                let batch = result.expect("batch should be ready");
437                assert_eq!(batch.num_rows(), 3);
438            }
439        }
440    }
441
442    #[test]
443    fn test_process_row_generic_with_nulls() {
444        use crate::traits::row::MockRowBuilder;
445
446        let schema = Arc::new(Schema::new(vec![
447            Field::new("id", DataType::Int32, true),
448            Field::new("name", DataType::Utf8, true),
449        ]));
450        let config = BatchConfig::with_batch_size(2);
451        let mut processor = HanaBatchProcessor::new(schema, config);
452
453        // Row with null values
454        let row = MockRowBuilder::new().null().null().build();
455
456        let result = processor.process_row_generic(&row);
457        assert!(result.is_ok());
458        assert_eq!(processor.buffered_rows(), 1);
459    }
460
461    #[test]
462    fn test_process_row_generic_schema_mismatch() {
463        use crate::traits::row::MockRowBuilder;
464
465        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
466        let mut processor = HanaBatchProcessor::with_defaults(schema);
467
468        // Row with wrong number of columns
469        let row = MockRowBuilder::new().int(1).string("extra").build();
470
471        let result = processor.process_row_generic(&row);
472        assert!(result.is_err());
473        let err = result.unwrap_err();
474        assert!(err.is_schema_mismatch());
475    }
476
477    #[test]
478    fn test_process_row_generic_flush() {
479        use crate::traits::row::MockRowBuilder;
480
481        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
482        let config = BatchConfig::with_batch_size(100);
483        let mut processor = HanaBatchProcessor::new(schema, config);
484
485        // Add some rows (less than batch size)
486        for i in 0..5 {
487            let row = MockRowBuilder::new().int(i).build();
488            processor.process_row_generic(&row).unwrap();
489        }
490
491        assert_eq!(processor.buffered_rows(), 5);
492
493        // Flush remaining rows
494        let batch = processor
495            .flush()
496            .unwrap()
497            .expect("should have remaining rows");
498        assert_eq!(batch.num_rows(), 5);
499        assert_eq!(processor.buffered_rows(), 0);
500    }
501
502    // ═══════════════════════════════════════════════════════════════════════════
503    // Builder Reuse Tests
504    // ═══════════════════════════════════════════════════════════════════════════
505
506    #[test]
507    fn test_builder_reuse_after_finish() {
508        use crate::traits::row::MockRowBuilder;
509
510        let schema = Arc::new(Schema::new(vec![
511            Field::new("id", DataType::Int32, false),
512            Field::new("name", DataType::Utf8, true),
513        ]));
514        let config = BatchConfig::with_batch_size(2);
515        let mut processor = HanaBatchProcessor::new(schema, config);
516
517        // Process first batch
518        for i in 0..2 {
519            let row = MockRowBuilder::new().int(i).string("test").build();
520            let result = processor.process_row_generic(&row).unwrap();
521            if i == 1 {
522                assert!(result.is_some(), "First batch should be ready");
523            }
524        }
525
526        // Verify processor can continue processing (builders reused)
527        for i in 2..4 {
528            let row = MockRowBuilder::new().int(i).string("test2").build();
529            let result = processor.process_row_generic(&row).unwrap();
530            if i == 3 {
531                let batch = result.expect("Second batch should be ready");
532                assert_eq!(batch.num_rows(), 2);
533                // Verify data is from second batch, not first
534                let id_array = batch
535                    .column(0)
536                    .as_any()
537                    .downcast_ref::<arrow_array::Int32Array>()
538                    .unwrap();
539                assert_eq!(id_array.value(0), 2);
540                assert_eq!(id_array.value(1), 3);
541            }
542        }
543    }
544}