hdbconnect_arrow/conversion/
processor.rs

1//! Batch processor for streaming conversion of HANA rows to `RecordBatch`es.
2//!
3//! Implements buffered batch creation with configurable batch size.
4
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9
10use crate::Result;
11use crate::builders::factory::BuilderFactory;
12use crate::traits::builder::HanaCompatibleBuilder;
13use crate::traits::streaming::BatchConfig;
14
15/// Processor that converts HANA rows into Arrow `RecordBatch`es.
16///
17/// Buffers rows until `batch_size` is reached, then emits a `RecordBatch`.
18/// Implements the `BatchProcessor` trait with GAT support.
19///
20/// # Example
21///
22/// ```rust,ignore
23/// use hdbconnect_arrow::conversion::HanaBatchProcessor;
24/// use hdbconnect_arrow::traits::streaming::BatchConfig;
25///
26/// let schema = /* Arrow schema */;
27/// let config = BatchConfig::with_batch_size(10000);
28/// let mut processor = HanaBatchProcessor::new(Arc::new(schema), config);
29///
30/// for row in result_set {
31///     if let Some(batch) = processor.process_row(row)? {
32///         // Process batch
33///     }
34/// }
35///
36/// // Don't forget to flush remaining rows
37/// if let Some(batch) = processor.flush()? {
38///     // Process final batch
39/// }
40/// ```
41pub struct HanaBatchProcessor {
42    schema: SchemaRef,
43    config: BatchConfig,
44    builders: Vec<Box<dyn HanaCompatibleBuilder>>,
45    row_count: usize,
46}
47
48impl std::fmt::Debug for HanaBatchProcessor {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("HanaBatchProcessor")
51            .field("schema", &self.schema)
52            .field("config", &self.config)
53            .field("builders", &format!("[{} builders]", self.builders.len()))
54            .field("row_count", &self.row_count)
55            .finish()
56    }
57}
58
59impl HanaBatchProcessor {
60    /// Create a new batch processor.
61    ///
62    /// # Arguments
63    ///
64    /// * `schema` - Arrow schema for the batches
65    /// * `config` - Batch processing configuration
66    #[must_use]
67    pub fn new(schema: SchemaRef, config: BatchConfig) -> Self {
68        let factory = BuilderFactory::from_config(&config);
69        let builders = factory.create_builders_for_schema(&schema);
70
71        Self {
72            schema,
73            config,
74            builders,
75            row_count: 0,
76        }
77    }
78
79    /// Create with default configuration.
80    #[must_use]
81    pub fn with_defaults(schema: SchemaRef) -> Self {
82        Self::new(schema, BatchConfig::default())
83    }
84
85    /// Process a single row.
86    ///
87    /// Returns `Ok(Some(batch))` when a batch is ready, `Ok(None)` when more
88    /// rows are needed to fill a batch.
89    ///
90    /// # Errors
91    ///
92    /// Returns error if value conversion fails or schema mismatches.
93    pub fn process_row(&mut self, row: &hdbconnect::Row) -> Result<Option<RecordBatch>> {
94        // Validate column count
95        if row.len() != self.builders.len() {
96            return Err(crate::ArrowConversionError::schema_mismatch(
97                self.builders.len(),
98                row.len(),
99            ));
100        }
101
102        // Append row to builders
103        for (i, builder) in self.builders.iter_mut().enumerate() {
104            // Use index access for row values
105            let value = &row[i];
106
107            match value {
108                hdbconnect::HdbValue::NULL => builder.append_null(),
109                v => builder.append_hana_value(v)?,
110            }
111        }
112
113        self.row_count += 1;
114
115        // Check if we've reached batch size
116        if self.row_count >= self.config.batch_size {
117            return Ok(Some(self.finish_current_batch()?));
118        }
119
120        Ok(None)
121    }
122
123    /// Flush any remaining rows as a final batch.
124    ///
125    /// # Errors
126    ///
127    /// Returns error if `RecordBatch` creation fails.
128    pub fn flush(&mut self) -> Result<Option<RecordBatch>> {
129        if self.row_count == 0 {
130            return Ok(None);
131        }
132
133        Ok(Some(self.finish_current_batch()?))
134    }
135
136    /// Returns the schema of batches produced by this processor.
137    #[must_use]
138    pub fn schema(&self) -> SchemaRef {
139        Arc::clone(&self.schema)
140    }
141
142    /// Returns the current row count in the buffer.
143    #[must_use]
144    pub const fn buffered_rows(&self) -> usize {
145        self.row_count
146    }
147
148    /// Finish the current batch and reset builders.
149    ///
150    /// # Errors
151    ///
152    /// Returns error if `RecordBatch` creation fails.
153    fn finish_current_batch(&mut self) -> Result<RecordBatch> {
154        // Finish all builders to get arrays
155        let arrays: Vec<_> = self.builders.iter_mut().map(|b| b.finish()).collect();
156
157        // Create RecordBatch
158        let batch = RecordBatch::try_new(Arc::clone(&self.schema), arrays)
159            .map_err(|e| crate::ArrowConversionError::value_conversion("batch", e.to_string()))?;
160
161        // Reset builders for next batch
162        let factory = BuilderFactory::from_config(&self.config);
163        self.builders = factory.create_builders_for_schema(&self.schema);
164        self.row_count = 0;
165
166        Ok(batch)
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use arrow_schema::{DataType, Field, Schema};
173
174    use super::*;
175
176    #[test]
177    fn test_processor_creation() {
178        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
179        let config = BatchConfig::with_batch_size(100);
180
181        let processor = HanaBatchProcessor::new(schema, config);
182        assert_eq!(processor.buffered_rows(), 0);
183    }
184
185    #[test]
186    fn test_processor_buffering() {
187        // Note: Requires mock hdbconnect::Row implementation
188        // Would test that rows are buffered correctly
189    }
190
191    #[test]
192    fn test_processor_batch_emission() {
193        // Test that batch is emitted when batch_size is reached
194    }
195
196    #[test]
197    fn test_processor_flush() {
198        // Test that flush emits remaining rows
199    }
200}