hdbconnect_arrow/traits/streaming.rs
1//! Streaming traits using Generic Associated Types (GATs).
2//!
3//! GATs enable streaming patterns where returned items can borrow from
4//! the iterator itself, avoiding unnecessary allocations.
5//!
6//! # Why GATs?
7//!
8//! Traditional iterators cannot yield references to internal state because
9//! the `Item` type is fixed at trait definition time. GATs allow the item
10//! type to have a lifetime parameter tied to `&self`, enabling zero-copy
11//! streaming.
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! impl LendingBatchIterator for MyReader {
17//! type Item<'a> = &'a RecordBatch where Self: 'a;
18//!
19//! fn next_batch(&mut self) -> Option<Result<Self::Item<'_>>> {
20//! // Return reference to internal buffer
21//! self.buffer.as_ref().map(Ok)
22//! }
23//! }
24//! ```
25
26use arrow_array::RecordBatch;
27use arrow_schema::SchemaRef;
28
29/// A lending iterator that yields borrowed record batches.
30///
31/// This trait uses GATs to allow the yielded items to borrow from `self`,
32/// enabling zero-copy streaming without intermediate allocations.
33///
34/// Unlike `Iterator`, which owns its items, `LendingBatchIterator` can
35/// yield references to internal buffers that are reused between iterations.
36pub trait LendingBatchIterator {
37 /// The type of items yielded by this iterator.
38 ///
39 /// The lifetime parameter `'a` allows items to borrow from `self`.
40 type Item<'a>
41 where
42 Self: 'a;
43
44 /// Advance the iterator and return the next batch.
45 ///
46 /// Returns `None` when iteration is complete.
47 fn next_batch(&mut self) -> Option<crate::Result<Self::Item<'_>>>;
48
49 /// Returns the schema of batches produced by this iterator.
50 fn schema(&self) -> SchemaRef;
51
52 /// Returns a hint of the remaining number of batches, if known.
53 ///
54 /// Returns `(lower_bound, upper_bound)` where `upper_bound` is `None`
55 /// if the count is unknown.
56 fn size_hint(&self) -> (usize, Option<usize>) {
57 (0, None)
58 }
59}
60
61/// A batch processor that transforms input rows into Arrow `RecordBatches`.
62///
63/// Uses GATs to allow flexible lifetime relationships between the processor
64/// and the batches it produces.
65pub trait BatchProcessor {
66 /// Configuration type for this processor.
67 type Config;
68
69 /// Error type produced by this processor.
70 type Error: std::error::Error;
71
72 /// The batch type produced, which may borrow from the processor.
73 type Batch<'a>
74 where
75 Self: 'a;
76
77 /// Create a new processor with the given configuration.
78 fn new(config: Self::Config, schema: SchemaRef) -> Self;
79
80 /// Process a chunk of rows into a batch.
81 ///
82 /// # Errors
83 ///
84 /// Returns an error if processing fails.
85 fn process<'a>(&'a mut self, rows: &[hdbconnect::Row]) -> Result<Self::Batch<'a>, Self::Error>;
86
87 /// Flush any buffered data and return the final batch.
88 ///
89 /// # Errors
90 ///
91 /// Returns an error if flushing fails.
92 fn flush(&mut self) -> Result<Option<RecordBatch>, Self::Error>;
93}
94
95/// Configuration for batch processing.
96///
97/// Controls memory allocation and processing behavior for batch conversion.
98#[derive(Debug, Clone)]
99pub struct BatchConfig {
100 /// Maximum number of rows per batch.
101 ///
102 /// Larger batches reduce overhead but use more memory.
103 /// Default: 65536 (64K rows).
104 pub batch_size: usize,
105
106 /// Initial capacity for string builders (bytes).
107 ///
108 /// Pre-allocating string capacity reduces reallocations.
109 /// Default: 1MB.
110 pub string_capacity: usize,
111
112 /// Initial capacity for binary builders (bytes).
113 ///
114 /// Pre-allocating binary capacity reduces reallocations.
115 /// Default: 1MB.
116 pub binary_capacity: usize,
117
118 /// Whether to coerce types when possible.
119 ///
120 /// When true, numeric types may be widened (e.g., INT to BIGINT)
121 /// to avoid precision loss. Default: false.
122 pub coerce_types: bool,
123}
124
125impl Default for BatchConfig {
126 fn default() -> Self {
127 Self {
128 batch_size: 65536,
129 string_capacity: 1024 * 1024, // 1MB
130 binary_capacity: 1024 * 1024, // 1MB
131 coerce_types: false,
132 }
133 }
134}
135
136impl BatchConfig {
137 /// Create a new configuration with the specified batch size.
138 #[must_use]
139 pub fn with_batch_size(batch_size: usize) -> Self {
140 Self {
141 batch_size,
142 ..Default::default()
143 }
144 }
145
146 /// Set the string builder capacity.
147 #[must_use]
148 pub const fn string_capacity(mut self, capacity: usize) -> Self {
149 self.string_capacity = capacity;
150 self
151 }
152
153 /// Set the binary builder capacity.
154 #[must_use]
155 pub const fn binary_capacity(mut self, capacity: usize) -> Self {
156 self.binary_capacity = capacity;
157 self
158 }
159
160 /// Enable or disable type coercion.
161 #[must_use]
162 pub const fn coerce_types(mut self, coerce: bool) -> Self {
163 self.coerce_types = coerce;
164 self
165 }
166
167 /// Create a configuration optimized for small result sets.
168 ///
169 /// Uses smaller batch size and buffer capacities.
170 #[must_use]
171 pub const fn small() -> Self {
172 Self {
173 batch_size: 1024,
174 string_capacity: 64 * 1024, // 64KB
175 binary_capacity: 64 * 1024, // 64KB
176 coerce_types: false,
177 }
178 }
179
180 /// Create a configuration optimized for large result sets.
181 ///
182 /// Uses larger batch size and buffer capacities.
183 #[must_use]
184 pub const fn large() -> Self {
185 Self {
186 batch_size: 131_072, // 128K rows
187 string_capacity: 8 * 1024 * 1024, // 8MB
188 binary_capacity: 8 * 1024 * 1024, // 8MB
189 coerce_types: false,
190 }
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197
198 #[test]
199 fn test_batch_config_default() {
200 let config = BatchConfig::default();
201 assert_eq!(config.batch_size, 65536);
202 assert_eq!(config.string_capacity, 1024 * 1024);
203 assert!(!config.coerce_types);
204 }
205
206 #[test]
207 fn test_batch_config_builder() {
208 let config = BatchConfig::with_batch_size(1000)
209 .string_capacity(500)
210 .coerce_types(true);
211
212 assert_eq!(config.batch_size, 1000);
213 assert_eq!(config.string_capacity, 500);
214 assert!(config.coerce_types);
215 }
216
217 #[test]
218 fn test_batch_config_presets() {
219 let small = BatchConfig::small();
220 assert_eq!(small.batch_size, 1024);
221
222 let large = BatchConfig::large();
223 assert_eq!(large.batch_size, 131072);
224 }
225}