hdbconnect_arrow/traits/
streaming.rs1use std::num::NonZeroUsize;
27
28use arrow_array::RecordBatch;
29use arrow_schema::SchemaRef;
30
31pub trait LendingBatchIterator {
39 type Item<'a>
43 where
44 Self: 'a;
45
46 fn next_batch(&mut self) -> Option<crate::Result<Self::Item<'_>>>;
50
51 fn schema(&self) -> SchemaRef;
53
54 fn size_hint(&self) -> (usize, Option<usize>) {
59 (0, None)
60 }
61}
62
63pub trait BatchProcessor {
68 type Config;
70
71 type Error: std::error::Error;
73
74 type Batch<'a>
76 where
77 Self: 'a;
78
79 fn new(config: Self::Config, schema: SchemaRef) -> Self;
81
82 fn process<'a>(&'a mut self, rows: &[hdbconnect::Row]) -> Result<Self::Batch<'a>, Self::Error>;
88
89 fn flush(&mut self) -> Result<Option<RecordBatch>, Self::Error>;
95}
96
97#[derive(Debug, Clone)]
101pub struct BatchConfig {
102 pub batch_size: NonZeroUsize,
107
108 pub string_capacity: usize,
113
114 pub binary_capacity: usize,
119
120 pub coerce_types: bool,
125}
126
127impl Default for BatchConfig {
128 fn default() -> Self {
129 Self {
130 batch_size: NonZeroUsize::new(65536).unwrap(),
132 string_capacity: 1024 * 1024, binary_capacity: 1024 * 1024, coerce_types: false,
135 }
136 }
137}
138
139impl BatchConfig {
140 #[must_use]
146 pub fn with_batch_size(batch_size: usize) -> Self {
147 Self {
148 batch_size: NonZeroUsize::new(batch_size).expect("batch_size must be non-zero"),
149 ..Default::default()
150 }
151 }
152
153 #[must_use]
157 pub fn try_with_batch_size(batch_size: usize) -> Option<Self> {
158 Some(Self {
159 batch_size: NonZeroUsize::new(batch_size)?,
160 ..Default::default()
161 })
162 }
163
164 #[must_use]
166 pub const fn batch_size_usize(&self) -> usize {
167 self.batch_size.get()
168 }
169
170 #[must_use]
172 pub const fn string_capacity(mut self, capacity: usize) -> Self {
173 self.string_capacity = capacity;
174 self
175 }
176
177 #[must_use]
179 pub const fn binary_capacity(mut self, capacity: usize) -> Self {
180 self.binary_capacity = capacity;
181 self
182 }
183
184 #[must_use]
186 pub const fn coerce_types(mut self, coerce: bool) -> Self {
187 self.coerce_types = coerce;
188 self
189 }
190
191 #[must_use]
199 pub const fn small() -> Self {
200 Self {
201 batch_size: match NonZeroUsize::new(1024) {
203 Some(v) => v,
204 None => panic!("batch_size must be non-zero"),
205 },
206 string_capacity: 64 * 1024, binary_capacity: 64 * 1024, coerce_types: false,
209 }
210 }
211
212 #[must_use]
220 pub const fn large() -> Self {
221 Self {
222 batch_size: match NonZeroUsize::new(131_072) {
225 Some(v) => v,
226 None => panic!("batch_size must be non-zero"),
227 },
228 string_capacity: 8 * 1024 * 1024, binary_capacity: 8 * 1024 * 1024, coerce_types: false,
231 }
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238
239 #[test]
244 fn test_batch_config_default() {
245 let config = BatchConfig::default();
246 assert_eq!(config.batch_size.get(), 65536);
247 assert_eq!(config.string_capacity, 1024 * 1024);
248 assert!(!config.coerce_types);
249 }
250
251 #[test]
252 fn test_batch_config_default_binary_capacity() {
253 let config = BatchConfig::default();
254 assert_eq!(config.binary_capacity, 1024 * 1024);
255 }
256
257 #[test]
262 fn test_batch_config_builder() {
263 let config = BatchConfig::with_batch_size(1000)
264 .string_capacity(500)
265 .coerce_types(true);
266
267 assert_eq!(config.batch_size.get(), 1000);
268 assert_eq!(config.string_capacity, 500);
269 assert!(config.coerce_types);
270 }
271
272 #[test]
273 fn test_batch_config_with_batch_size() {
274 let config = BatchConfig::with_batch_size(100);
275 assert_eq!(config.batch_size.get(), 100);
276 assert_eq!(config.string_capacity, 1024 * 1024);
277 assert_eq!(config.binary_capacity, 1024 * 1024);
278 assert!(!config.coerce_types);
279 }
280
281 #[test]
282 fn test_batch_config_string_capacity() {
283 let config = BatchConfig::default().string_capacity(2048);
284 assert_eq!(config.string_capacity, 2048);
285 }
286
287 #[test]
288 fn test_batch_config_binary_capacity() {
289 let config = BatchConfig::default().binary_capacity(4096);
290 assert_eq!(config.binary_capacity, 4096);
291 }
292
293 #[test]
294 fn test_batch_config_coerce_types_true() {
295 let config = BatchConfig::default().coerce_types(true);
296 assert!(config.coerce_types);
297 }
298
299 #[test]
300 fn test_batch_config_coerce_types_false() {
301 let config = BatchConfig::default().coerce_types(false);
302 assert!(!config.coerce_types);
303 }
304
305 #[test]
306 fn test_batch_config_builder_chaining() {
307 let config = BatchConfig::with_batch_size(5000)
308 .string_capacity(10000)
309 .binary_capacity(20000)
310 .coerce_types(true);
311
312 assert_eq!(config.batch_size.get(), 5000);
313 assert_eq!(config.string_capacity, 10000);
314 assert_eq!(config.binary_capacity, 20000);
315 assert!(config.coerce_types);
316 }
317
318 #[test]
323 fn test_batch_config_presets() {
324 let small = BatchConfig::small();
325 assert_eq!(small.batch_size.get(), 1024);
326
327 let large = BatchConfig::large();
328 assert_eq!(large.batch_size.get(), 131072);
329 }
330
331 #[test]
332 fn test_batch_config_small() {
333 let config = BatchConfig::small();
334 assert_eq!(config.batch_size.get(), 1024);
335 assert_eq!(config.string_capacity, 64 * 1024);
336 assert_eq!(config.binary_capacity, 64 * 1024);
337 assert!(!config.coerce_types);
338 }
339
340 #[test]
341 fn test_batch_config_large() {
342 let config = BatchConfig::large();
343 assert_eq!(config.batch_size.get(), 131_072);
344 assert_eq!(config.string_capacity, 8 * 1024 * 1024);
345 assert_eq!(config.binary_capacity, 8 * 1024 * 1024);
346 assert!(!config.coerce_types);
347 }
348
349 #[test]
354 #[should_panic(expected = "batch_size must be non-zero")]
355 fn test_batch_config_zero_batch_size_panics() {
356 let _ = BatchConfig::with_batch_size(0);
357 }
358
359 #[test]
360 fn test_batch_config_try_with_zero_returns_none() {
361 assert!(BatchConfig::try_with_batch_size(0).is_none());
362 }
363
364 #[test]
365 fn test_batch_config_try_with_nonzero_returns_some() {
366 let config = BatchConfig::try_with_batch_size(100);
367 assert!(config.is_some());
368 assert_eq!(config.unwrap().batch_size.get(), 100);
369 }
370
371 #[test]
372 fn test_batch_config_zero_string_capacity() {
373 let config = BatchConfig::default().string_capacity(0);
374 assert_eq!(config.string_capacity, 0);
375 }
376
377 #[test]
378 fn test_batch_config_zero_binary_capacity() {
379 let config = BatchConfig::default().binary_capacity(0);
380 assert_eq!(config.binary_capacity, 0);
381 }
382
383 #[test]
384 fn test_batch_config_large_values() {
385 let config = BatchConfig::with_batch_size(1_000_000)
386 .string_capacity(100_000_000)
387 .binary_capacity(100_000_000);
388
389 assert_eq!(config.batch_size.get(), 1_000_000);
390 assert_eq!(config.string_capacity, 100_000_000);
391 assert_eq!(config.binary_capacity, 100_000_000);
392 }
393
394 #[test]
395 fn test_batch_config_batch_size_usize() {
396 let config = BatchConfig::with_batch_size(42);
397 assert_eq!(config.batch_size_usize(), 42);
398 }
399
400 #[test]
405 fn test_batch_config_clone() {
406 let config1 = BatchConfig::with_batch_size(100).string_capacity(200);
407 let config2 = config1.clone();
408
409 assert_eq!(config1.batch_size, config2.batch_size);
410 assert_eq!(config1.string_capacity, config2.string_capacity);
411 assert_eq!(config1.binary_capacity, config2.binary_capacity);
412 assert_eq!(config1.coerce_types, config2.coerce_types);
413 }
414
415 #[test]
416 fn test_batch_config_debug() {
417 let config = BatchConfig::default();
418 let debug_str = format!("{:?}", config);
419 assert!(debug_str.contains("BatchConfig"));
420 assert!(debug_str.contains("batch_size"));
421 }
422
423 #[test]
428 fn test_batch_config_override_after_preset() {
429 let config = BatchConfig::small()
430 .string_capacity(1_000_000)
431 .coerce_types(true);
432
433 assert_eq!(config.batch_size.get(), 1024);
434 assert_eq!(config.string_capacity, 1_000_000);
435 assert!(config.coerce_types);
436 }
437
438 #[test]
439 fn test_batch_config_multiple_overrides() {
440 let config = BatchConfig::default()
441 .string_capacity(100)
442 .string_capacity(200)
443 .string_capacity(300);
444
445 assert_eq!(config.string_capacity, 300);
446 }
447}