Skip to main content

hdbconnect_arrow/traits/
streaming.rs

1//! Streaming traits using Generic Associated Types (GATs).
2//!
3//! GATs enable streaming patterns where returned items can borrow from
4//! the iterator itself, avoiding unnecessary allocations.
5//!
6//! # Why GATs?
7//!
8//! Traditional iterators cannot yield references to internal state because
9//! the `Item` type is fixed at trait definition time. GATs allow the item
10//! type to have a lifetime parameter tied to `&self`, enabling zero-copy
11//! streaming.
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! impl LendingBatchIterator for MyReader {
17//!     type Item<'a> = &'a RecordBatch where Self: 'a;
18//!
19//!     fn next_batch(&mut self) -> Option<Result<Self::Item<'_>>> {
20//!         // Return reference to internal buffer
21//!         self.buffer.as_ref().map(Ok)
22//!     }
23//! }
24//! ```
25
26use std::num::NonZeroUsize;
27
28use arrow_array::RecordBatch;
29use arrow_schema::SchemaRef;
30
31/// A lending iterator that yields borrowed record batches.
32///
33/// This trait uses GATs to allow the yielded items to borrow from `self`,
34/// enabling zero-copy streaming without intermediate allocations.
35///
36/// Unlike `Iterator`, which owns its items, `LendingBatchIterator` can
37/// yield references to internal buffers that are reused between iterations.
38pub trait LendingBatchIterator {
39    /// The type of items yielded by this iterator.
40    ///
41    /// The lifetime parameter `'a` allows items to borrow from `self`.
42    type Item<'a>
43    where
44        Self: 'a;
45
46    /// Advance the iterator and return the next batch.
47    ///
48    /// Returns `None` when iteration is complete.
49    fn next_batch(&mut self) -> Option<crate::Result<Self::Item<'_>>>;
50
51    /// Returns the schema of batches produced by this iterator.
52    fn schema(&self) -> SchemaRef;
53
54    /// Returns a hint of the remaining number of batches, if known.
55    ///
56    /// Returns `(lower_bound, upper_bound)` where `upper_bound` is `None`
57    /// if the count is unknown.
58    fn size_hint(&self) -> (usize, Option<usize>) {
59        (0, None)
60    }
61}
62
63/// A batch processor that transforms input rows into Arrow `RecordBatches`.
64///
65/// Uses GATs to allow flexible lifetime relationships between the processor
66/// and the batches it produces.
67pub trait BatchProcessor {
68    /// Configuration type for this processor.
69    type Config;
70
71    /// Error type produced by this processor.
72    type Error: std::error::Error;
73
74    /// The batch type produced, which may borrow from the processor.
75    type Batch<'a>
76    where
77        Self: 'a;
78
79    /// Create a new processor with the given configuration.
80    fn new(config: Self::Config, schema: SchemaRef) -> Self;
81
82    /// Process a chunk of rows into a batch.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if processing fails.
87    fn process<'a>(&'a mut self, rows: &[hdbconnect::Row]) -> Result<Self::Batch<'a>, Self::Error>;
88
89    /// Flush any buffered data and return the final batch.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if flushing fails.
94    fn flush(&mut self) -> Result<Option<RecordBatch>, Self::Error>;
95}
96
97/// Configuration for batch processing.
98///
99/// Controls memory allocation and processing behavior for batch conversion.
100#[derive(Debug, Clone)]
101pub struct BatchConfig {
102    /// Maximum number of rows per batch.
103    ///
104    /// Uses `NonZeroUsize` to prevent division-by-zero and infinite loops.
105    /// Default: 65536 (64K rows).
106    pub batch_size: NonZeroUsize,
107
108    /// Initial capacity for string builders (bytes).
109    ///
110    /// Pre-allocating string capacity reduces reallocations.
111    /// Default: 1MB.
112    pub string_capacity: usize,
113
114    /// Initial capacity for binary builders (bytes).
115    ///
116    /// Pre-allocating binary capacity reduces reallocations.
117    /// Default: 1MB.
118    pub binary_capacity: usize,
119
120    /// Whether to coerce types when possible.
121    ///
122    /// When true, numeric types may be widened (e.g., INT to BIGINT)
123    /// to avoid precision loss. Default: false.
124    pub coerce_types: bool,
125}
126
127impl Default for BatchConfig {
128    fn default() -> Self {
129        Self {
130            // SAFETY: 65536 is non-zero
131            batch_size: NonZeroUsize::new(65536).unwrap(),
132            string_capacity: 1024 * 1024, // 1MB
133            binary_capacity: 1024 * 1024, // 1MB
134            coerce_types: false,
135        }
136    }
137}
138
139impl BatchConfig {
140    /// Create a new configuration with the specified batch size.
141    ///
142    /// # Panics
143    ///
144    /// Panics if `batch_size` is zero. Use `try_with_batch_size` for fallible construction.
145    #[must_use]
146    pub fn with_batch_size(batch_size: usize) -> Self {
147        Self {
148            batch_size: NonZeroUsize::new(batch_size).expect("batch_size must be non-zero"),
149            ..Default::default()
150        }
151    }
152
153    /// Create a new configuration with the specified batch size.
154    ///
155    /// Returns `None` if `batch_size` is zero.
156    #[must_use]
157    pub fn try_with_batch_size(batch_size: usize) -> Option<Self> {
158        Some(Self {
159            batch_size: NonZeroUsize::new(batch_size)?,
160            ..Default::default()
161        })
162    }
163
164    /// Get batch size as usize for iteration.
165    #[must_use]
166    pub const fn batch_size_usize(&self) -> usize {
167        self.batch_size.get()
168    }
169
170    /// Set the string builder capacity.
171    #[must_use]
172    pub const fn string_capacity(mut self, capacity: usize) -> Self {
173        self.string_capacity = capacity;
174        self
175    }
176
177    /// Set the binary builder capacity.
178    #[must_use]
179    pub const fn binary_capacity(mut self, capacity: usize) -> Self {
180        self.binary_capacity = capacity;
181        self
182    }
183
184    /// Enable or disable type coercion.
185    #[must_use]
186    pub const fn coerce_types(mut self, coerce: bool) -> Self {
187        self.coerce_types = coerce;
188        self
189    }
190
191    /// Create a configuration optimized for small result sets.
192    ///
193    /// Uses smaller batch size and buffer capacities.
194    ///
195    /// # Panics
196    ///
197    /// Never panics - the batch size is a compile-time constant.
198    #[must_use]
199    pub const fn small() -> Self {
200        Self {
201            // SAFETY: 1024 is non-zero - unwrap() in const context panics at compile time if None
202            batch_size: match NonZeroUsize::new(1024) {
203                Some(v) => v,
204                None => panic!("batch_size must be non-zero"),
205            },
206            string_capacity: 64 * 1024, // 64KB
207            binary_capacity: 64 * 1024, // 64KB
208            coerce_types: false,
209        }
210    }
211
212    /// Create a configuration optimized for large result sets.
213    ///
214    /// Uses larger batch size and buffer capacities.
215    ///
216    /// # Panics
217    ///
218    /// Never panics - the batch size is a compile-time constant.
219    #[must_use]
220    pub const fn large() -> Self {
221        Self {
222            // SAFETY: 131_072 is non-zero - unwrap() in const context panics at compile time if
223            // None
224            batch_size: match NonZeroUsize::new(131_072) {
225                Some(v) => v,
226                None => panic!("batch_size must be non-zero"),
227            },
228            string_capacity: 8 * 1024 * 1024, // 8MB
229            binary_capacity: 8 * 1024 * 1024, // 8MB
230            coerce_types: false,
231        }
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    // ═══════════════════════════════════════════════════════════════════════════
240    // BatchConfig Default Tests
241    // ═══════════════════════════════════════════════════════════════════════════
242
243    #[test]
244    fn test_batch_config_default() {
245        let config = BatchConfig::default();
246        assert_eq!(config.batch_size.get(), 65536);
247        assert_eq!(config.string_capacity, 1024 * 1024);
248        assert!(!config.coerce_types);
249    }
250
251    #[test]
252    fn test_batch_config_default_binary_capacity() {
253        let config = BatchConfig::default();
254        assert_eq!(config.binary_capacity, 1024 * 1024);
255    }
256
257    // ═══════════════════════════════════════════════════════════════════════════
258    // BatchConfig Builder Pattern Tests
259    // ═══════════════════════════════════════════════════════════════════════════
260
261    #[test]
262    fn test_batch_config_builder() {
263        let config = BatchConfig::with_batch_size(1000)
264            .string_capacity(500)
265            .coerce_types(true);
266
267        assert_eq!(config.batch_size.get(), 1000);
268        assert_eq!(config.string_capacity, 500);
269        assert!(config.coerce_types);
270    }
271
272    #[test]
273    fn test_batch_config_with_batch_size() {
274        let config = BatchConfig::with_batch_size(100);
275        assert_eq!(config.batch_size.get(), 100);
276        assert_eq!(config.string_capacity, 1024 * 1024);
277        assert_eq!(config.binary_capacity, 1024 * 1024);
278        assert!(!config.coerce_types);
279    }
280
281    #[test]
282    fn test_batch_config_string_capacity() {
283        let config = BatchConfig::default().string_capacity(2048);
284        assert_eq!(config.string_capacity, 2048);
285    }
286
287    #[test]
288    fn test_batch_config_binary_capacity() {
289        let config = BatchConfig::default().binary_capacity(4096);
290        assert_eq!(config.binary_capacity, 4096);
291    }
292
293    #[test]
294    fn test_batch_config_coerce_types_true() {
295        let config = BatchConfig::default().coerce_types(true);
296        assert!(config.coerce_types);
297    }
298
299    #[test]
300    fn test_batch_config_coerce_types_false() {
301        let config = BatchConfig::default().coerce_types(false);
302        assert!(!config.coerce_types);
303    }
304
305    #[test]
306    fn test_batch_config_builder_chaining() {
307        let config = BatchConfig::with_batch_size(5000)
308            .string_capacity(10000)
309            .binary_capacity(20000)
310            .coerce_types(true);
311
312        assert_eq!(config.batch_size.get(), 5000);
313        assert_eq!(config.string_capacity, 10000);
314        assert_eq!(config.binary_capacity, 20000);
315        assert!(config.coerce_types);
316    }
317
318    // ═══════════════════════════════════════════════════════════════════════════
319    // BatchConfig Preset Tests
320    // ═══════════════════════════════════════════════════════════════════════════
321
322    #[test]
323    fn test_batch_config_presets() {
324        let small = BatchConfig::small();
325        assert_eq!(small.batch_size.get(), 1024);
326
327        let large = BatchConfig::large();
328        assert_eq!(large.batch_size.get(), 131072);
329    }
330
331    #[test]
332    fn test_batch_config_small() {
333        let config = BatchConfig::small();
334        assert_eq!(config.batch_size.get(), 1024);
335        assert_eq!(config.string_capacity, 64 * 1024);
336        assert_eq!(config.binary_capacity, 64 * 1024);
337        assert!(!config.coerce_types);
338    }
339
340    #[test]
341    fn test_batch_config_large() {
342        let config = BatchConfig::large();
343        assert_eq!(config.batch_size.get(), 131_072);
344        assert_eq!(config.string_capacity, 8 * 1024 * 1024);
345        assert_eq!(config.binary_capacity, 8 * 1024 * 1024);
346        assert!(!config.coerce_types);
347    }
348
349    // ═══════════════════════════════════════════════════════════════════════════
350    // BatchConfig Edge Cases
351    // ═══════════════════════════════════════════════════════════════════════════
352
353    #[test]
354    #[should_panic(expected = "batch_size must be non-zero")]
355    fn test_batch_config_zero_batch_size_panics() {
356        let _ = BatchConfig::with_batch_size(0);
357    }
358
359    #[test]
360    fn test_batch_config_try_with_zero_returns_none() {
361        assert!(BatchConfig::try_with_batch_size(0).is_none());
362    }
363
364    #[test]
365    fn test_batch_config_try_with_nonzero_returns_some() {
366        let config = BatchConfig::try_with_batch_size(100);
367        assert!(config.is_some());
368        assert_eq!(config.unwrap().batch_size.get(), 100);
369    }
370
371    #[test]
372    fn test_batch_config_zero_string_capacity() {
373        let config = BatchConfig::default().string_capacity(0);
374        assert_eq!(config.string_capacity, 0);
375    }
376
377    #[test]
378    fn test_batch_config_zero_binary_capacity() {
379        let config = BatchConfig::default().binary_capacity(0);
380        assert_eq!(config.binary_capacity, 0);
381    }
382
383    #[test]
384    fn test_batch_config_large_values() {
385        let config = BatchConfig::with_batch_size(1_000_000)
386            .string_capacity(100_000_000)
387            .binary_capacity(100_000_000);
388
389        assert_eq!(config.batch_size.get(), 1_000_000);
390        assert_eq!(config.string_capacity, 100_000_000);
391        assert_eq!(config.binary_capacity, 100_000_000);
392    }
393
394    #[test]
395    fn test_batch_config_batch_size_usize() {
396        let config = BatchConfig::with_batch_size(42);
397        assert_eq!(config.batch_size_usize(), 42);
398    }
399
400    // ═══════════════════════════════════════════════════════════════════════════
401    // BatchConfig Clone and Debug Tests
402    // ═══════════════════════════════════════════════════════════════════════════
403
404    #[test]
405    fn test_batch_config_clone() {
406        let config1 = BatchConfig::with_batch_size(100).string_capacity(200);
407        let config2 = config1.clone();
408
409        assert_eq!(config1.batch_size, config2.batch_size);
410        assert_eq!(config1.string_capacity, config2.string_capacity);
411        assert_eq!(config1.binary_capacity, config2.binary_capacity);
412        assert_eq!(config1.coerce_types, config2.coerce_types);
413    }
414
415    #[test]
416    fn test_batch_config_debug() {
417        let config = BatchConfig::default();
418        let debug_str = format!("{:?}", config);
419        assert!(debug_str.contains("BatchConfig"));
420        assert!(debug_str.contains("batch_size"));
421    }
422
423    // ═══════════════════════════════════════════════════════════════════════════
424    // BatchConfig Override Tests
425    // ═══════════════════════════════════════════════════════════════════════════
426
427    #[test]
428    fn test_batch_config_override_after_preset() {
429        let config = BatchConfig::small()
430            .string_capacity(1_000_000)
431            .coerce_types(true);
432
433        assert_eq!(config.batch_size.get(), 1024);
434        assert_eq!(config.string_capacity, 1_000_000);
435        assert!(config.coerce_types);
436    }
437
438    #[test]
439    fn test_batch_config_multiple_overrides() {
440        let config = BatchConfig::default()
441            .string_capacity(100)
442            .string_capacity(200)
443            .string_capacity(300);
444
445        assert_eq!(config.string_capacity, 300);
446    }
447}