Skip to main content

hdbconnect_arrow/traits/
streaming.rs

1//! Streaming traits using Generic Associated Types (GATs).
2//!
3//! GATs enable streaming patterns where returned items can borrow from
4//! the iterator itself, avoiding unnecessary allocations.
5//!
6//! # Why GATs?
7//!
8//! Traditional iterators cannot yield references to internal state because
9//! the `Item` type is fixed at trait definition time. GATs allow the item
10//! type to have a lifetime parameter tied to `&self`, enabling zero-copy
11//! streaming.
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! impl LendingBatchIterator for MyReader {
17//!     type Item<'a> = &'a RecordBatch where Self: 'a;
18//!
19//!     fn next_batch(&mut self) -> Option<Result<Self::Item<'_>>> {
20//!         // Return reference to internal buffer
21//!         self.buffer.as_ref().map(Ok)
22//!     }
23//! }
24//! ```
25
26use std::num::NonZeroUsize;
27
28use arrow_array::RecordBatch;
29use arrow_schema::SchemaRef;
30
31/// A lending iterator that yields borrowed record batches.
32///
33/// This trait uses GATs to allow the yielded items to borrow from `self`,
34/// enabling zero-copy streaming without intermediate allocations.
35///
36/// Unlike `Iterator`, which owns its items, `LendingBatchIterator` can
37/// yield references to internal buffers that are reused between iterations.
38pub trait LendingBatchIterator {
39    /// The type of items yielded by this iterator.
40    ///
41    /// The lifetime parameter `'a` allows items to borrow from `self`.
42    type Item<'a>
43    where
44        Self: 'a;
45
46    /// Advance the iterator and return the next batch.
47    ///
48    /// Returns `None` when iteration is complete.
49    fn next_batch(&mut self) -> Option<crate::Result<Self::Item<'_>>>;
50
51    /// Returns the schema of batches produced by this iterator.
52    fn schema(&self) -> SchemaRef;
53
54    /// Returns a hint of the remaining number of batches, if known.
55    ///
56    /// Returns `(lower_bound, upper_bound)` where `upper_bound` is `None`
57    /// if the count is unknown.
58    fn size_hint(&self) -> (usize, Option<usize>) {
59        (0, None)
60    }
61}
62
63/// A batch processor that transforms input rows into Arrow `RecordBatches`.
64///
65/// Uses GATs to allow flexible lifetime relationships between the processor
66/// and the batches it produces.
67pub trait BatchProcessor {
68    /// Configuration type for this processor.
69    type Config;
70
71    /// Error type produced by this processor.
72    type Error: std::error::Error;
73
74    /// The batch type produced, which may borrow from the processor.
75    type Batch<'a>
76    where
77        Self: 'a;
78
79    /// Create a new processor with the given configuration.
80    fn new(config: Self::Config, schema: SchemaRef) -> Self;
81
82    /// Process a chunk of rows into a batch.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if processing fails.
87    fn process<'a>(&'a mut self, rows: &[hdbconnect::Row]) -> Result<Self::Batch<'a>, Self::Error>;
88
89    /// Flush any buffered data and return the final batch.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if flushing fails.
94    fn flush(&mut self) -> Result<Option<RecordBatch>, Self::Error>;
95}
96
97/// Configuration for batch processing.
98///
99/// Controls memory allocation and processing behavior for batch conversion.
100#[derive(Debug, Clone)]
101pub struct BatchConfig {
102    /// Maximum number of rows per batch.
103    ///
104    /// Uses `NonZeroUsize` to prevent division-by-zero and infinite loops.
105    /// Default: 65536 (64K rows).
106    pub batch_size: NonZeroUsize,
107
108    /// Initial capacity for string builders (bytes).
109    ///
110    /// Pre-allocating string capacity reduces reallocations.
111    /// Default: 1MB.
112    pub string_capacity: usize,
113
114    /// Initial capacity for binary builders (bytes).
115    ///
116    /// Pre-allocating binary capacity reduces reallocations.
117    /// Default: 1MB.
118    pub binary_capacity: usize,
119
120    /// Whether to coerce types when possible.
121    ///
122    /// When true, numeric types may be widened (e.g., INT to BIGINT)
123    /// to avoid precision loss. Default: false.
124    pub coerce_types: bool,
125
126    /// Maximum LOB size in bytes before rejecting.
127    ///
128    /// When set, LOB values exceeding this size will trigger an error
129    /// instead of being materialized. This prevents OOM conditions
130    /// when processing result sets with large LOB values.
131    ///
132    /// Default: None (no limit).
133    pub max_lob_bytes: Option<usize>,
134}
135
136impl Default for BatchConfig {
137    fn default() -> Self {
138        Self {
139            // SAFETY: 65536 is non-zero
140            batch_size: NonZeroUsize::new(65536).unwrap(),
141            string_capacity: 1024 * 1024, // 1MB
142            binary_capacity: 1024 * 1024, // 1MB
143            coerce_types: false,
144            max_lob_bytes: None,
145        }
146    }
147}
148
149impl BatchConfig {
150    /// Create a new configuration with the specified batch size.
151    ///
152    /// # Panics
153    ///
154    /// Panics if `batch_size` is zero. Use `try_with_batch_size` for fallible construction.
155    #[must_use]
156    pub fn with_batch_size(batch_size: usize) -> Self {
157        Self {
158            batch_size: NonZeroUsize::new(batch_size).expect("batch_size must be non-zero"),
159            ..Default::default()
160        }
161    }
162
163    /// Create a new configuration with the specified batch size.
164    ///
165    /// Returns `None` if `batch_size` is zero.
166    #[must_use]
167    pub fn try_with_batch_size(batch_size: usize) -> Option<Self> {
168        Some(Self {
169            batch_size: NonZeroUsize::new(batch_size)?,
170            ..Default::default()
171        })
172    }
173
174    /// Get batch size as usize for iteration.
175    #[must_use]
176    pub const fn batch_size_usize(&self) -> usize {
177        self.batch_size.get()
178    }
179
180    /// Set the string builder capacity.
181    #[must_use]
182    pub const fn string_capacity(mut self, capacity: usize) -> Self {
183        self.string_capacity = capacity;
184        self
185    }
186
187    /// Set the binary builder capacity.
188    #[must_use]
189    pub const fn binary_capacity(mut self, capacity: usize) -> Self {
190        self.binary_capacity = capacity;
191        self
192    }
193
194    /// Enable or disable type coercion.
195    #[must_use]
196    pub const fn coerce_types(mut self, coerce: bool) -> Self {
197        self.coerce_types = coerce;
198        self
199    }
200
201    /// Set the maximum LOB size in bytes.
202    ///
203    /// LOB values exceeding this size will cause an error during conversion.
204    /// Set to `None` to disable the limit (default).
205    #[must_use]
206    pub const fn max_lob_bytes(mut self, max: Option<usize>) -> Self {
207        self.max_lob_bytes = max;
208        self
209    }
210
211    /// Create a configuration optimized for small result sets.
212    ///
213    /// Uses smaller batch size and buffer capacities.
214    ///
215    /// # Panics
216    ///
217    /// Never panics - the batch size is a compile-time constant.
218    #[must_use]
219    pub const fn small() -> Self {
220        Self {
221            // SAFETY: 1024 is non-zero - unwrap() in const context panics at compile time if None
222            batch_size: match NonZeroUsize::new(1024) {
223                Some(v) => v,
224                None => panic!("batch_size must be non-zero"),
225            },
226            string_capacity: 64 * 1024, // 64KB
227            binary_capacity: 64 * 1024, // 64KB
228            coerce_types: false,
229            max_lob_bytes: None,
230        }
231    }
232
233    /// Create a configuration optimized for large result sets.
234    ///
235    /// Uses larger batch size and buffer capacities.
236    ///
237    /// # Panics
238    ///
239    /// Never panics - the batch size is a compile-time constant.
240    #[must_use]
241    pub const fn large() -> Self {
242        Self {
243            // SAFETY: 131_072 is non-zero - unwrap() in const context panics at compile time if
244            // None
245            batch_size: match NonZeroUsize::new(131_072) {
246                Some(v) => v,
247                None => panic!("batch_size must be non-zero"),
248            },
249            string_capacity: 8 * 1024 * 1024, // 8MB
250            binary_capacity: 8 * 1024 * 1024, // 8MB
251            coerce_types: false,
252            max_lob_bytes: None,
253        }
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    // ═══════════════════════════════════════════════════════════════════════════
262    // BatchConfig Default Tests
263    // ═══════════════════════════════════════════════════════════════════════════
264
265    #[test]
266    fn test_batch_config_default() {
267        let config = BatchConfig::default();
268        assert_eq!(config.batch_size.get(), 65536);
269        assert_eq!(config.string_capacity, 1024 * 1024);
270        assert!(!config.coerce_types);
271        assert!(config.max_lob_bytes.is_none());
272    }
273
274    #[test]
275    fn test_batch_config_default_binary_capacity() {
276        let config = BatchConfig::default();
277        assert_eq!(config.binary_capacity, 1024 * 1024);
278    }
279
280    // ═══════════════════════════════════════════════════════════════════════════
281    // BatchConfig Builder Pattern Tests
282    // ═══════════════════════════════════════════════════════════════════════════
283
284    #[test]
285    fn test_batch_config_builder() {
286        let config = BatchConfig::with_batch_size(1000)
287            .string_capacity(500)
288            .coerce_types(true);
289
290        assert_eq!(config.batch_size.get(), 1000);
291        assert_eq!(config.string_capacity, 500);
292        assert!(config.coerce_types);
293    }
294
295    #[test]
296    fn test_batch_config_with_batch_size() {
297        let config = BatchConfig::with_batch_size(100);
298        assert_eq!(config.batch_size.get(), 100);
299        assert_eq!(config.string_capacity, 1024 * 1024);
300        assert_eq!(config.binary_capacity, 1024 * 1024);
301        assert!(!config.coerce_types);
302    }
303
304    #[test]
305    fn test_batch_config_string_capacity() {
306        let config = BatchConfig::default().string_capacity(2048);
307        assert_eq!(config.string_capacity, 2048);
308    }
309
310    #[test]
311    fn test_batch_config_binary_capacity() {
312        let config = BatchConfig::default().binary_capacity(4096);
313        assert_eq!(config.binary_capacity, 4096);
314    }
315
316    #[test]
317    fn test_batch_config_coerce_types_true() {
318        let config = BatchConfig::default().coerce_types(true);
319        assert!(config.coerce_types);
320    }
321
322    #[test]
323    fn test_batch_config_coerce_types_false() {
324        let config = BatchConfig::default().coerce_types(false);
325        assert!(!config.coerce_types);
326    }
327
328    #[test]
329    fn test_batch_config_builder_chaining() {
330        let config = BatchConfig::with_batch_size(5000)
331            .string_capacity(10000)
332            .binary_capacity(20000)
333            .coerce_types(true);
334
335        assert_eq!(config.batch_size.get(), 5000);
336        assert_eq!(config.string_capacity, 10000);
337        assert_eq!(config.binary_capacity, 20000);
338        assert!(config.coerce_types);
339    }
340
341    // ═══════════════════════════════════════════════════════════════════════════
342    // BatchConfig max_lob_bytes Tests
343    // ═══════════════════════════════════════════════════════════════════════════
344
345    #[test]
346    fn test_batch_config_max_lob_bytes_none() {
347        let config = BatchConfig::default();
348        assert!(config.max_lob_bytes.is_none());
349    }
350
351    #[test]
352    fn test_batch_config_max_lob_bytes_some() {
353        let config = BatchConfig::default().max_lob_bytes(Some(50_000_000));
354        assert_eq!(config.max_lob_bytes, Some(50_000_000));
355    }
356
357    #[test]
358    fn test_batch_config_max_lob_bytes_reset_to_none() {
359        let config = BatchConfig::default()
360            .max_lob_bytes(Some(1000))
361            .max_lob_bytes(None);
362        assert!(config.max_lob_bytes.is_none());
363    }
364
365    #[test]
366    fn test_batch_config_max_lob_bytes_chaining() {
367        let config = BatchConfig::with_batch_size(1000)
368            .string_capacity(500)
369            .max_lob_bytes(Some(10_000_000));
370
371        assert_eq!(config.batch_size.get(), 1000);
372        assert_eq!(config.string_capacity, 500);
373        assert_eq!(config.max_lob_bytes, Some(10_000_000));
374    }
375
376    // ═══════════════════════════════════════════════════════════════════════════
377    // BatchConfig Preset Tests
378    // ═══════════════════════════════════════════════════════════════════════════
379
380    #[test]
381    fn test_batch_config_presets() {
382        let small = BatchConfig::small();
383        assert_eq!(small.batch_size.get(), 1024);
384        assert!(small.max_lob_bytes.is_none());
385
386        let large = BatchConfig::large();
387        assert_eq!(large.batch_size.get(), 131072);
388        assert!(large.max_lob_bytes.is_none());
389    }
390
391    #[test]
392    fn test_batch_config_small() {
393        let config = BatchConfig::small();
394        assert_eq!(config.batch_size.get(), 1024);
395        assert_eq!(config.string_capacity, 64 * 1024);
396        assert_eq!(config.binary_capacity, 64 * 1024);
397        assert!(!config.coerce_types);
398    }
399
400    #[test]
401    fn test_batch_config_large() {
402        let config = BatchConfig::large();
403        assert_eq!(config.batch_size.get(), 131_072);
404        assert_eq!(config.string_capacity, 8 * 1024 * 1024);
405        assert_eq!(config.binary_capacity, 8 * 1024 * 1024);
406        assert!(!config.coerce_types);
407    }
408
409    // ═══════════════════════════════════════════════════════════════════════════
410    // BatchConfig Edge Cases
411    // ═══════════════════════════════════════════════════════════════════════════
412
413    #[test]
414    #[should_panic(expected = "batch_size must be non-zero")]
415    fn test_batch_config_zero_batch_size_panics() {
416        let _ = BatchConfig::with_batch_size(0);
417    }
418
419    #[test]
420    fn test_batch_config_try_with_zero_returns_none() {
421        assert!(BatchConfig::try_with_batch_size(0).is_none());
422    }
423
424    #[test]
425    fn test_batch_config_try_with_nonzero_returns_some() {
426        let config = BatchConfig::try_with_batch_size(100);
427        assert!(config.is_some());
428        assert_eq!(config.unwrap().batch_size.get(), 100);
429    }
430
431    #[test]
432    fn test_batch_config_zero_string_capacity() {
433        let config = BatchConfig::default().string_capacity(0);
434        assert_eq!(config.string_capacity, 0);
435    }
436
437    #[test]
438    fn test_batch_config_zero_binary_capacity() {
439        let config = BatchConfig::default().binary_capacity(0);
440        assert_eq!(config.binary_capacity, 0);
441    }
442
443    #[test]
444    fn test_batch_config_large_values() {
445        let config = BatchConfig::with_batch_size(1_000_000)
446            .string_capacity(100_000_000)
447            .binary_capacity(100_000_000);
448
449        assert_eq!(config.batch_size.get(), 1_000_000);
450        assert_eq!(config.string_capacity, 100_000_000);
451        assert_eq!(config.binary_capacity, 100_000_000);
452    }
453
454    #[test]
455    fn test_batch_config_batch_size_usize() {
456        let config = BatchConfig::with_batch_size(42);
457        assert_eq!(config.batch_size_usize(), 42);
458    }
459
460    // ═══════════════════════════════════════════════════════════════════════════
461    // BatchConfig Clone and Debug Tests
462    // ═══════════════════════════════════════════════════════════════════════════
463
464    #[test]
465    fn test_batch_config_clone() {
466        let config1 = BatchConfig::with_batch_size(100)
467            .string_capacity(200)
468            .max_lob_bytes(Some(1000));
469        let config2 = config1.clone();
470
471        assert_eq!(config1.batch_size, config2.batch_size);
472        assert_eq!(config1.string_capacity, config2.string_capacity);
473        assert_eq!(config1.binary_capacity, config2.binary_capacity);
474        assert_eq!(config1.coerce_types, config2.coerce_types);
475        assert_eq!(config1.max_lob_bytes, config2.max_lob_bytes);
476    }
477
478    #[test]
479    fn test_batch_config_debug() {
480        let config = BatchConfig::default();
481        let debug_str = format!("{:?}", config);
482        assert!(debug_str.contains("BatchConfig"));
483        assert!(debug_str.contains("batch_size"));
484        assert!(debug_str.contains("max_lob_bytes"));
485    }
486
487    // ═══════════════════════════════════════════════════════════════════════════
488    // BatchConfig Override Tests
489    // ═══════════════════════════════════════════════════════════════════════════
490
491    #[test]
492    fn test_batch_config_override_after_preset() {
493        let config = BatchConfig::small()
494            .string_capacity(1_000_000)
495            .coerce_types(true);
496
497        assert_eq!(config.batch_size.get(), 1024);
498        assert_eq!(config.string_capacity, 1_000_000);
499        assert!(config.coerce_types);
500    }
501
502    #[test]
503    fn test_batch_config_multiple_overrides() {
504        let config = BatchConfig::default()
505            .string_capacity(100)
506            .string_capacity(200)
507            .string_capacity(300);
508
509        assert_eq!(config.string_capacity, 300);
510    }
511}