hdbconnect_arrow/conversion/
processor.rs1use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9
10use crate::Result;
11use crate::builders::factory::BuilderFactory;
12use crate::traits::builder::HanaCompatibleBuilder;
13use crate::traits::streaming::BatchConfig;
14
15pub struct HanaBatchProcessor {
42 schema: SchemaRef,
43 config: BatchConfig,
44 builders: Vec<Box<dyn HanaCompatibleBuilder>>,
45 row_count: usize,
46}
47
48impl std::fmt::Debug for HanaBatchProcessor {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("HanaBatchProcessor")
51 .field("schema", &self.schema)
52 .field("config", &self.config)
53 .field("builders", &format!("[{} builders]", self.builders.len()))
54 .field("row_count", &self.row_count)
55 .finish()
56 }
57}
58
59impl HanaBatchProcessor {
60 #[must_use]
67 pub fn new(schema: SchemaRef, config: BatchConfig) -> Self {
68 let factory = BuilderFactory::from_config(&config);
69 let builders = factory.create_builders_for_schema(&schema);
70
71 Self {
72 schema,
73 config,
74 builders,
75 row_count: 0,
76 }
77 }
78
79 #[must_use]
81 pub fn with_defaults(schema: SchemaRef) -> Self {
82 Self::new(schema, BatchConfig::default())
83 }
84
85 pub fn process_row(&mut self, row: &hdbconnect::Row) -> Result<Option<RecordBatch>> {
94 if row.len() != self.builders.len() {
96 return Err(crate::ArrowConversionError::schema_mismatch(
97 self.builders.len(),
98 row.len(),
99 ));
100 }
101
102 for (i, builder) in self.builders.iter_mut().enumerate() {
104 let value = &row[i];
106
107 match value {
108 hdbconnect::HdbValue::NULL => builder.append_null(),
109 v => builder.append_hana_value(v)?,
110 }
111 }
112
113 self.row_count += 1;
114
115 if self.row_count >= self.config.batch_size {
117 return Ok(Some(self.finish_current_batch()?));
118 }
119
120 Ok(None)
121 }
122
123 pub fn flush(&mut self) -> Result<Option<RecordBatch>> {
129 if self.row_count == 0 {
130 return Ok(None);
131 }
132
133 Ok(Some(self.finish_current_batch()?))
134 }
135
136 #[must_use]
138 pub fn schema(&self) -> SchemaRef {
139 Arc::clone(&self.schema)
140 }
141
142 #[must_use]
144 pub const fn buffered_rows(&self) -> usize {
145 self.row_count
146 }
147
148 fn finish_current_batch(&mut self) -> Result<RecordBatch> {
154 let arrays: Vec<_> = self.builders.iter_mut().map(|b| b.finish()).collect();
156
157 let batch = RecordBatch::try_new(Arc::clone(&self.schema), arrays)
159 .map_err(|e| crate::ArrowConversionError::value_conversion("batch", e.to_string()))?;
160
161 let factory = BuilderFactory::from_config(&self.config);
163 self.builders = factory.create_builders_for_schema(&self.schema);
164 self.row_count = 0;
165
166 Ok(batch)
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use arrow_schema::{DataType, Field, Schema};
173
174 use super::*;
175
176 #[test]
177 fn test_processor_creation() {
178 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
179 let config = BatchConfig::with_batch_size(100);
180
181 let processor = HanaBatchProcessor::new(schema, config);
182 assert_eq!(processor.buffered_rows(), 0);
183 }
184
185 #[test]
186 fn test_processor_buffering() {
187 }
190
191 #[test]
192 fn test_processor_batch_emission() {
193 }
195
196 #[test]
197 fn test_processor_flush() {
198 }
200}