hdbconnect_arrow/traits/
streaming.rs1use std::num::NonZeroUsize;
27
28use arrow_array::RecordBatch;
29use arrow_schema::SchemaRef;
30
31pub trait LendingBatchIterator {
39 type Item<'a>
43 where
44 Self: 'a;
45
46 fn next_batch(&mut self) -> Option<crate::Result<Self::Item<'_>>>;
50
51 fn schema(&self) -> SchemaRef;
53
54 fn size_hint(&self) -> (usize, Option<usize>) {
59 (0, None)
60 }
61}
62
63pub trait BatchProcessor {
68 type Config;
70
71 type Error: std::error::Error;
73
74 type Batch<'a>
76 where
77 Self: 'a;
78
79 fn new(config: Self::Config, schema: SchemaRef) -> Self;
81
82 fn process<'a>(&'a mut self, rows: &[hdbconnect::Row]) -> Result<Self::Batch<'a>, Self::Error>;
88
89 fn flush(&mut self) -> Result<Option<RecordBatch>, Self::Error>;
95}
96
97#[derive(Debug, Clone)]
101pub struct BatchConfig {
102 pub batch_size: NonZeroUsize,
107
108 pub string_capacity: usize,
113
114 pub binary_capacity: usize,
119
120 pub coerce_types: bool,
125
126 pub max_lob_bytes: Option<usize>,
134}
135
136impl Default for BatchConfig {
137 fn default() -> Self {
138 Self {
139 batch_size: NonZeroUsize::new(65536).unwrap(),
141 string_capacity: 1024 * 1024, binary_capacity: 1024 * 1024, coerce_types: false,
144 max_lob_bytes: None,
145 }
146 }
147}
148
149impl BatchConfig {
150 #[must_use]
156 pub fn with_batch_size(batch_size: usize) -> Self {
157 Self {
158 batch_size: NonZeroUsize::new(batch_size).expect("batch_size must be non-zero"),
159 ..Default::default()
160 }
161 }
162
163 #[must_use]
167 pub fn try_with_batch_size(batch_size: usize) -> Option<Self> {
168 Some(Self {
169 batch_size: NonZeroUsize::new(batch_size)?,
170 ..Default::default()
171 })
172 }
173
174 #[must_use]
176 pub const fn batch_size_usize(&self) -> usize {
177 self.batch_size.get()
178 }
179
180 #[must_use]
182 pub const fn string_capacity(mut self, capacity: usize) -> Self {
183 self.string_capacity = capacity;
184 self
185 }
186
187 #[must_use]
189 pub const fn binary_capacity(mut self, capacity: usize) -> Self {
190 self.binary_capacity = capacity;
191 self
192 }
193
194 #[must_use]
196 pub const fn coerce_types(mut self, coerce: bool) -> Self {
197 self.coerce_types = coerce;
198 self
199 }
200
201 #[must_use]
206 pub const fn max_lob_bytes(mut self, max: Option<usize>) -> Self {
207 self.max_lob_bytes = max;
208 self
209 }
210
211 #[must_use]
219 pub const fn small() -> Self {
220 Self {
221 batch_size: match NonZeroUsize::new(1024) {
223 Some(v) => v,
224 None => panic!("batch_size must be non-zero"),
225 },
226 string_capacity: 64 * 1024, binary_capacity: 64 * 1024, coerce_types: false,
229 max_lob_bytes: None,
230 }
231 }
232
233 #[must_use]
241 pub const fn large() -> Self {
242 Self {
243 batch_size: match NonZeroUsize::new(131_072) {
246 Some(v) => v,
247 None => panic!("batch_size must be non-zero"),
248 },
249 string_capacity: 8 * 1024 * 1024, binary_capacity: 8 * 1024 * 1024, coerce_types: false,
252 max_lob_bytes: None,
253 }
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260
261 #[test]
266 fn test_batch_config_default() {
267 let config = BatchConfig::default();
268 assert_eq!(config.batch_size.get(), 65536);
269 assert_eq!(config.string_capacity, 1024 * 1024);
270 assert!(!config.coerce_types);
271 assert!(config.max_lob_bytes.is_none());
272 }
273
274 #[test]
275 fn test_batch_config_default_binary_capacity() {
276 let config = BatchConfig::default();
277 assert_eq!(config.binary_capacity, 1024 * 1024);
278 }
279
280 #[test]
285 fn test_batch_config_builder() {
286 let config = BatchConfig::with_batch_size(1000)
287 .string_capacity(500)
288 .coerce_types(true);
289
290 assert_eq!(config.batch_size.get(), 1000);
291 assert_eq!(config.string_capacity, 500);
292 assert!(config.coerce_types);
293 }
294
295 #[test]
296 fn test_batch_config_with_batch_size() {
297 let config = BatchConfig::with_batch_size(100);
298 assert_eq!(config.batch_size.get(), 100);
299 assert_eq!(config.string_capacity, 1024 * 1024);
300 assert_eq!(config.binary_capacity, 1024 * 1024);
301 assert!(!config.coerce_types);
302 }
303
304 #[test]
305 fn test_batch_config_string_capacity() {
306 let config = BatchConfig::default().string_capacity(2048);
307 assert_eq!(config.string_capacity, 2048);
308 }
309
310 #[test]
311 fn test_batch_config_binary_capacity() {
312 let config = BatchConfig::default().binary_capacity(4096);
313 assert_eq!(config.binary_capacity, 4096);
314 }
315
316 #[test]
317 fn test_batch_config_coerce_types_true() {
318 let config = BatchConfig::default().coerce_types(true);
319 assert!(config.coerce_types);
320 }
321
322 #[test]
323 fn test_batch_config_coerce_types_false() {
324 let config = BatchConfig::default().coerce_types(false);
325 assert!(!config.coerce_types);
326 }
327
328 #[test]
329 fn test_batch_config_builder_chaining() {
330 let config = BatchConfig::with_batch_size(5000)
331 .string_capacity(10000)
332 .binary_capacity(20000)
333 .coerce_types(true);
334
335 assert_eq!(config.batch_size.get(), 5000);
336 assert_eq!(config.string_capacity, 10000);
337 assert_eq!(config.binary_capacity, 20000);
338 assert!(config.coerce_types);
339 }
340
341 #[test]
346 fn test_batch_config_max_lob_bytes_none() {
347 let config = BatchConfig::default();
348 assert!(config.max_lob_bytes.is_none());
349 }
350
351 #[test]
352 fn test_batch_config_max_lob_bytes_some() {
353 let config = BatchConfig::default().max_lob_bytes(Some(50_000_000));
354 assert_eq!(config.max_lob_bytes, Some(50_000_000));
355 }
356
357 #[test]
358 fn test_batch_config_max_lob_bytes_reset_to_none() {
359 let config = BatchConfig::default()
360 .max_lob_bytes(Some(1000))
361 .max_lob_bytes(None);
362 assert!(config.max_lob_bytes.is_none());
363 }
364
365 #[test]
366 fn test_batch_config_max_lob_bytes_chaining() {
367 let config = BatchConfig::with_batch_size(1000)
368 .string_capacity(500)
369 .max_lob_bytes(Some(10_000_000));
370
371 assert_eq!(config.batch_size.get(), 1000);
372 assert_eq!(config.string_capacity, 500);
373 assert_eq!(config.max_lob_bytes, Some(10_000_000));
374 }
375
376 #[test]
381 fn test_batch_config_presets() {
382 let small = BatchConfig::small();
383 assert_eq!(small.batch_size.get(), 1024);
384 assert!(small.max_lob_bytes.is_none());
385
386 let large = BatchConfig::large();
387 assert_eq!(large.batch_size.get(), 131072);
388 assert!(large.max_lob_bytes.is_none());
389 }
390
391 #[test]
392 fn test_batch_config_small() {
393 let config = BatchConfig::small();
394 assert_eq!(config.batch_size.get(), 1024);
395 assert_eq!(config.string_capacity, 64 * 1024);
396 assert_eq!(config.binary_capacity, 64 * 1024);
397 assert!(!config.coerce_types);
398 }
399
400 #[test]
401 fn test_batch_config_large() {
402 let config = BatchConfig::large();
403 assert_eq!(config.batch_size.get(), 131_072);
404 assert_eq!(config.string_capacity, 8 * 1024 * 1024);
405 assert_eq!(config.binary_capacity, 8 * 1024 * 1024);
406 assert!(!config.coerce_types);
407 }
408
409 #[test]
414 #[should_panic(expected = "batch_size must be non-zero")]
415 fn test_batch_config_zero_batch_size_panics() {
416 let _ = BatchConfig::with_batch_size(0);
417 }
418
419 #[test]
420 fn test_batch_config_try_with_zero_returns_none() {
421 assert!(BatchConfig::try_with_batch_size(0).is_none());
422 }
423
424 #[test]
425 fn test_batch_config_try_with_nonzero_returns_some() {
426 let config = BatchConfig::try_with_batch_size(100);
427 assert!(config.is_some());
428 assert_eq!(config.unwrap().batch_size.get(), 100);
429 }
430
431 #[test]
432 fn test_batch_config_zero_string_capacity() {
433 let config = BatchConfig::default().string_capacity(0);
434 assert_eq!(config.string_capacity, 0);
435 }
436
437 #[test]
438 fn test_batch_config_zero_binary_capacity() {
439 let config = BatchConfig::default().binary_capacity(0);
440 assert_eq!(config.binary_capacity, 0);
441 }
442
443 #[test]
444 fn test_batch_config_large_values() {
445 let config = BatchConfig::with_batch_size(1_000_000)
446 .string_capacity(100_000_000)
447 .binary_capacity(100_000_000);
448
449 assert_eq!(config.batch_size.get(), 1_000_000);
450 assert_eq!(config.string_capacity, 100_000_000);
451 assert_eq!(config.binary_capacity, 100_000_000);
452 }
453
454 #[test]
455 fn test_batch_config_batch_size_usize() {
456 let config = BatchConfig::with_batch_size(42);
457 assert_eq!(config.batch_size_usize(), 42);
458 }
459
460 #[test]
465 fn test_batch_config_clone() {
466 let config1 = BatchConfig::with_batch_size(100)
467 .string_capacity(200)
468 .max_lob_bytes(Some(1000));
469 let config2 = config1.clone();
470
471 assert_eq!(config1.batch_size, config2.batch_size);
472 assert_eq!(config1.string_capacity, config2.string_capacity);
473 assert_eq!(config1.binary_capacity, config2.binary_capacity);
474 assert_eq!(config1.coerce_types, config2.coerce_types);
475 assert_eq!(config1.max_lob_bytes, config2.max_lob_bytes);
476 }
477
478 #[test]
479 fn test_batch_config_debug() {
480 let config = BatchConfig::default();
481 let debug_str = format!("{:?}", config);
482 assert!(debug_str.contains("BatchConfig"));
483 assert!(debug_str.contains("batch_size"));
484 assert!(debug_str.contains("max_lob_bytes"));
485 }
486
487 #[test]
492 fn test_batch_config_override_after_preset() {
493 let config = BatchConfig::small()
494 .string_capacity(1_000_000)
495 .coerce_types(true);
496
497 assert_eq!(config.batch_size.get(), 1024);
498 assert_eq!(config.string_capacity, 1_000_000);
499 assert!(config.coerce_types);
500 }
501
502 #[test]
503 fn test_batch_config_multiple_overrides() {
504 let config = BatchConfig::default()
505 .string_capacity(100)
506 .string_capacity(200)
507 .string_capacity(300);
508
509 assert_eq!(config.string_capacity, 300);
510 }
511}