hdbconnect_arrow/traits/
streaming.rs

1//! Streaming traits using Generic Associated Types (GATs).
2//!
3//! GATs enable streaming patterns where returned items can borrow from
4//! the iterator itself, avoiding unnecessary allocations.
5//!
6//! # Why GATs?
7//!
8//! Traditional iterators cannot yield references to internal state because
9//! the `Item` type is fixed at trait definition time. GATs allow the item
10//! type to have a lifetime parameter tied to `&self`, enabling zero-copy
11//! streaming.
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! impl LendingBatchIterator for MyReader {
17//!     type Item<'a> = &'a RecordBatch where Self: 'a;
18//!
19//!     fn next_batch(&mut self) -> Option<Result<Self::Item<'_>>> {
20//!         // Return reference to internal buffer
21//!         self.buffer.as_ref().map(Ok)
22//!     }
23//! }
24//! ```
25
26use arrow_array::RecordBatch;
27use arrow_schema::SchemaRef;
28
29/// A lending iterator that yields borrowed record batches.
30///
31/// This trait uses GATs to allow the yielded items to borrow from `self`,
32/// enabling zero-copy streaming without intermediate allocations.
33///
34/// Unlike `Iterator`, which owns its items, `LendingBatchIterator` can
35/// yield references to internal buffers that are reused between iterations.
36pub trait LendingBatchIterator {
37    /// The type of items yielded by this iterator.
38    ///
39    /// The lifetime parameter `'a` allows items to borrow from `self`.
40    type Item<'a>
41    where
42        Self: 'a;
43
44    /// Advance the iterator and return the next batch.
45    ///
46    /// Returns `None` when iteration is complete.
47    fn next_batch(&mut self) -> Option<crate::Result<Self::Item<'_>>>;
48
49    /// Returns the schema of batches produced by this iterator.
50    fn schema(&self) -> SchemaRef;
51
52    /// Returns a hint of the remaining number of batches, if known.
53    ///
54    /// Returns `(lower_bound, upper_bound)` where `upper_bound` is `None`
55    /// if the count is unknown.
56    fn size_hint(&self) -> (usize, Option<usize>) {
57        (0, None)
58    }
59}
60
61/// A batch processor that transforms input rows into Arrow `RecordBatches`.
62///
63/// Uses GATs to allow flexible lifetime relationships between the processor
64/// and the batches it produces.
65pub trait BatchProcessor {
66    /// Configuration type for this processor.
67    type Config;
68
69    /// Error type produced by this processor.
70    type Error: std::error::Error;
71
72    /// The batch type produced, which may borrow from the processor.
73    type Batch<'a>
74    where
75        Self: 'a;
76
77    /// Create a new processor with the given configuration.
78    fn new(config: Self::Config, schema: SchemaRef) -> Self;
79
80    /// Process a chunk of rows into a batch.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if processing fails.
85    fn process<'a>(&'a mut self, rows: &[hdbconnect::Row]) -> Result<Self::Batch<'a>, Self::Error>;
86
87    /// Flush any buffered data and return the final batch.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if flushing fails.
92    fn flush(&mut self) -> Result<Option<RecordBatch>, Self::Error>;
93}
94
95/// Configuration for batch processing.
96///
97/// Controls memory allocation and processing behavior for batch conversion.
98#[derive(Debug, Clone)]
99pub struct BatchConfig {
100    /// Maximum number of rows per batch.
101    ///
102    /// Larger batches reduce overhead but use more memory.
103    /// Default: 65536 (64K rows).
104    pub batch_size: usize,
105
106    /// Initial capacity for string builders (bytes).
107    ///
108    /// Pre-allocating string capacity reduces reallocations.
109    /// Default: 1MB.
110    pub string_capacity: usize,
111
112    /// Initial capacity for binary builders (bytes).
113    ///
114    /// Pre-allocating binary capacity reduces reallocations.
115    /// Default: 1MB.
116    pub binary_capacity: usize,
117
118    /// Whether to coerce types when possible.
119    ///
120    /// When true, numeric types may be widened (e.g., INT to BIGINT)
121    /// to avoid precision loss. Default: false.
122    pub coerce_types: bool,
123}
124
125impl Default for BatchConfig {
126    fn default() -> Self {
127        Self {
128            batch_size: 65536,
129            string_capacity: 1024 * 1024, // 1MB
130            binary_capacity: 1024 * 1024, // 1MB
131            coerce_types: false,
132        }
133    }
134}
135
136impl BatchConfig {
137    /// Create a new configuration with the specified batch size.
138    #[must_use]
139    pub fn with_batch_size(batch_size: usize) -> Self {
140        Self {
141            batch_size,
142            ..Default::default()
143        }
144    }
145
146    /// Set the string builder capacity.
147    #[must_use]
148    pub const fn string_capacity(mut self, capacity: usize) -> Self {
149        self.string_capacity = capacity;
150        self
151    }
152
153    /// Set the binary builder capacity.
154    #[must_use]
155    pub const fn binary_capacity(mut self, capacity: usize) -> Self {
156        self.binary_capacity = capacity;
157        self
158    }
159
160    /// Enable or disable type coercion.
161    #[must_use]
162    pub const fn coerce_types(mut self, coerce: bool) -> Self {
163        self.coerce_types = coerce;
164        self
165    }
166
167    /// Create a configuration optimized for small result sets.
168    ///
169    /// Uses smaller batch size and buffer capacities.
170    #[must_use]
171    pub const fn small() -> Self {
172        Self {
173            batch_size: 1024,
174            string_capacity: 64 * 1024, // 64KB
175            binary_capacity: 64 * 1024, // 64KB
176            coerce_types: false,
177        }
178    }
179
180    /// Create a configuration optimized for large result sets.
181    ///
182    /// Uses larger batch size and buffer capacities.
183    #[must_use]
184    pub const fn large() -> Self {
185        Self {
186            batch_size: 131_072,              // 128K rows
187            string_capacity: 8 * 1024 * 1024, // 8MB
188            binary_capacity: 8 * 1024 * 1024, // 8MB
189            coerce_types: false,
190        }
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197
198    #[test]
199    fn test_batch_config_default() {
200        let config = BatchConfig::default();
201        assert_eq!(config.batch_size, 65536);
202        assert_eq!(config.string_capacity, 1024 * 1024);
203        assert!(!config.coerce_types);
204    }
205
206    #[test]
207    fn test_batch_config_builder() {
208        let config = BatchConfig::with_batch_size(1000)
209            .string_capacity(500)
210            .coerce_types(true);
211
212        assert_eq!(config.batch_size, 1000);
213        assert_eq!(config.string_capacity, 500);
214        assert!(config.coerce_types);
215    }
216
217    #[test]
218    fn test_batch_config_presets() {
219        let small = BatchConfig::small();
220        assert_eq!(small.batch_size, 1024);
221
222        let large = BatchConfig::large();
223        assert_eq!(large.batch_size, 131072);
224    }
225}