1use std::any::Any;
5
6use vortex_buffer::BufferMut;
7use vortex_error::VortexExpect;
8use vortex_error::VortexResult;
9use vortex_error::vortex_ensure;
10use vortex_error::vortex_err;
11use vortex_error::vortex_panic;
12use vortex_mask::Mask;
13
14use crate::ArrayRef;
15use crate::IntoArray;
16use crate::LEGACY_SESSION;
17use crate::ToCanonical;
18use crate::VortexSessionExecute;
19use crate::arrays::DecimalArray;
20use crate::builders::ArrayBuilder;
21use crate::builders::DEFAULT_BUILDER_CAPACITY;
22use crate::builders::LazyBitBufferBuilder;
23use crate::canonical::Canonical;
24use crate::dtype::BigCast;
25use crate::dtype::DType;
26use crate::dtype::DecimalDType;
27use crate::dtype::NativeDecimalType;
28use crate::dtype::Nullability;
29use crate::dtype::i256;
30use crate::match_each_decimal_value;
31use crate::match_each_decimal_value_type;
32use crate::scalar::DecimalValue;
33use crate::scalar::Scalar;
34
35pub struct DecimalBuilder {
41 dtype: DType,
42 values: DecimalBuffer,
43 nulls: LazyBitBufferBuilder,
44}
45
46enum DecimalBuffer {
52 I8(BufferMut<i8>),
53 I16(BufferMut<i16>),
54 I32(BufferMut<i32>),
55 I64(BufferMut<i64>),
56 I128(BufferMut<i128>),
57 I256(BufferMut<i256>),
58}
59
60macro_rules! delegate_fn {
61 ($self:expr, | $tname:ident, $buffer:ident | $body:block) => {{
62 #[allow(unused)]
63 match $self {
64 DecimalBuffer::I8(buffer) => {
65 type $tname = i8;
66 let $buffer = buffer;
67 $body
68 }
69 DecimalBuffer::I16(buffer) => {
70 type $tname = i16;
71 let $buffer = buffer;
72 $body
73 }
74 DecimalBuffer::I32(buffer) => {
75 type $tname = i32;
76 let $buffer = buffer;
77 $body
78 }
79 DecimalBuffer::I64(buffer) => {
80 type $tname = i64;
81 let $buffer = buffer;
82 $body
83 }
84 DecimalBuffer::I128(buffer) => {
85 type $tname = i128;
86 let $buffer = buffer;
87 $body
88 }
89 DecimalBuffer::I256(buffer) => {
90 type $tname = i256;
91 let $buffer = buffer;
92 $body
93 }
94 }
95 }};
96}
97
98impl DecimalBuilder {
99 pub fn new<T: NativeDecimalType>(decimal: DecimalDType, nullability: Nullability) -> Self {
101 Self::with_capacity::<T>(DEFAULT_BUILDER_CAPACITY, decimal, nullability)
102 }
103
104 pub fn with_capacity<T: NativeDecimalType>(
106 capacity: usize,
107 decimal: DecimalDType,
108 nullability: Nullability,
109 ) -> Self {
110 Self {
111 dtype: DType::Decimal(decimal, nullability),
112 values: match_each_decimal_value_type!(T::DECIMAL_TYPE, |D| {
113 DecimalBuffer::from(BufferMut::<D>::with_capacity(capacity))
114 }),
115 nulls: LazyBitBufferBuilder::new(capacity),
116 }
117 }
118
119 pub fn append_value<V: NativeDecimalType>(&mut self, value: V) {
121 self.values.push(value);
122 self.nulls.append_non_null();
123 }
124
125 pub fn append_n_values<V: NativeDecimalType>(&mut self, value: V, n: usize) {
127 self.values.push_n(value, n);
128 self.nulls.append_n_non_nulls(n);
129 }
130
131 pub fn finish_into_decimal(&mut self) -> DecimalArray {
133 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
134
135 let decimal_dtype = *self.decimal_dtype();
136
137 delegate_fn!(std::mem::take(&mut self.values), |T, values| {
138 DecimalArray::new::<T>(values.freeze(), decimal_dtype, validity)
139 })
140 }
141
142 pub fn decimal_dtype(&self) -> &DecimalDType {
144 let DType::Decimal(decimal_dtype, _) = &self.dtype else {
145 vortex_panic!("`DecimalBuilder` somehow had dtype {}", self.dtype);
146 };
147
148 decimal_dtype
149 }
150}
151
152impl ArrayBuilder for DecimalBuilder {
153 fn as_any(&self) -> &dyn Any {
154 self
155 }
156
157 fn as_any_mut(&mut self) -> &mut dyn Any {
158 self
159 }
160
161 fn dtype(&self) -> &DType {
162 &self.dtype
163 }
164
165 fn len(&self) -> usize {
166 self.values.len()
167 }
168
169 fn append_zeros(&mut self, n: usize) {
170 self.values.push_n(0, n);
171 self.nulls.append_n_non_nulls(n);
172 }
173
174 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
175 self.values.push_n(0, n);
176 self.nulls.append_n_nulls(n);
177 }
178
179 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
180 vortex_ensure!(
181 scalar.dtype() == self.dtype(),
182 "DecimalBuilder expected scalar with dtype {}, got {}",
183 self.dtype(),
184 scalar.dtype()
185 );
186
187 match scalar.as_decimal().decimal_value() {
188 None => self.append_null(),
189 Some(v) => match_each_decimal_value!(v, |dec_val| {
190 self.append_value(dec_val);
191 }),
192 }
193
194 Ok(())
195 }
196
197 unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
198 let decimal_array = array.to_decimal();
199
200 match_each_decimal_value_type!(decimal_array.values_type(), |D| {
201 self.values
204 .extend(decimal_array.buffer::<D>().iter().copied());
205 });
206
207 self.nulls.append_validity_mask(
208 decimal_array
209 .as_ref()
210 .validity()
211 .vortex_expect("validity_mask")
212 .to_mask(
213 decimal_array.as_ref().len(),
214 &mut LEGACY_SESSION.create_execution_ctx(),
215 )
216 .vortex_expect("Failed to compute validity mask"),
217 );
218 }
219
220 fn reserve_exact(&mut self, additional: usize) {
221 self.values.reserve(additional);
222 self.nulls.reserve_exact(additional);
223 }
224
225 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
226 self.nulls = LazyBitBufferBuilder::new(validity.len());
227 self.nulls.append_validity_mask(validity);
228 }
229
230 fn finish(&mut self) -> ArrayRef {
231 self.finish_into_decimal().into_array()
232 }
233
234 fn finish_into_canonical(&mut self) -> Canonical {
235 Canonical::Decimal(self.finish_into_decimal())
236 }
237}
238
239impl DecimalBuffer {
240 fn push<V: NativeDecimalType>(&mut self, value: V) {
241 delegate_fn!(self, |T, buffer| {
242 buffer.push(
243 <T as BigCast>::from(value)
244 .ok_or_else(|| {
245 vortex_err!(
246 "decimal conversion failure {:?}, type: {:?} to {:?}",
247 value,
248 V::DECIMAL_TYPE,
249 T::DECIMAL_TYPE,
250 )
251 })
252 .vortex_expect("operation should succeed in builder"),
253 )
254 });
255 }
256
257 fn push_n<V: NativeDecimalType>(&mut self, value: V, n: usize) {
258 delegate_fn!(self, |T, buffer| {
259 buffer.push_n(
260 <T as BigCast>::from(value).vortex_expect("decimal conversion failure"),
261 n,
262 )
263 });
264 }
265
266 fn reserve(&mut self, additional: usize) {
267 delegate_fn!(self, |T, buffer| { buffer.reserve(additional) })
268 }
269
270 fn len(&self) -> usize {
271 delegate_fn!(self, |T, buffer| { buffer.len() })
272 }
273
274 pub fn extend<I, V: NativeDecimalType>(&mut self, iter: I)
275 where
276 I: Iterator<Item = V>,
277 {
278 delegate_fn!(self, |T, buffer| {
279 buffer.extend(
280 iter.map(|x| <T as BigCast>::from(x).vortex_expect("decimal conversion failure")),
281 )
282 })
283 }
284}
285
286macro_rules! impl_from_buffer {
287 ($T:ty, $variant:ident) => {
288 impl From<BufferMut<$T>> for DecimalBuffer {
289 fn from(buffer: BufferMut<$T>) -> Self {
290 Self::$variant(buffer)
291 }
292 }
293 };
294}
295
296impl_from_buffer!(i8, I8);
297impl_from_buffer!(i16, I16);
298impl_from_buffer!(i32, I32);
299impl_from_buffer!(i64, I64);
300impl_from_buffer!(i128, I128);
301impl_from_buffer!(i256, I256);
302
303impl Default for DecimalBuffer {
304 fn default() -> Self {
305 Self::I8(BufferMut::<i8>::empty())
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use crate::LEGACY_SESSION;
312 use crate::VortexSessionExecute;
313 use crate::assert_arrays_eq;
314 use crate::builders::ArrayBuilder;
315 use crate::builders::DecimalBuilder;
316 use crate::builders::decimal::DecimalArray;
317 use crate::dtype::DecimalDType;
318
319 #[test]
320 fn test_mixed_extend() {
321 let values = 42i8;
322
323 let mut i8s = DecimalBuilder::new::<i8>(DecimalDType::new(2, 1), false.into());
324 for v in 0..values {
325 i8s.append_value(v);
326 }
327 let i8s = i8s.finish();
328
329 let mut i128s = DecimalBuilder::new::<i128>(DecimalDType::new(2, 1), false.into());
330 i128s.extend_from_array(&i8s);
331 let i128s = i128s.finish();
332
333 for i in 0..i8s.len() {
334 assert_eq!(
335 i8s.execute_scalar(i, &mut LEGACY_SESSION.create_execution_ctx())
336 .unwrap(),
337 i128s
338 .execute_scalar(i, &mut LEGACY_SESSION.create_execution_ctx())
339 .unwrap()
340 );
341 }
342 }
343
344 #[test]
345 fn test_append_scalar() {
346 use crate::scalar::Scalar;
347
348 let mut builder = DecimalBuilder::new::<i64>(DecimalDType::new(10, 2), true.into());
350 builder.append_value(1234i64);
351 builder.append_value(5678i64);
352 builder.append_null();
353
354 let array = builder.finish();
355 let expected = DecimalArray::from_option_iter(
356 [Some(1234i64), Some(5678), None],
357 DecimalDType::new(10, 2),
358 );
359 assert_arrays_eq!(&array, &expected);
360
361 let mut builder2 = DecimalBuilder::new::<i64>(DecimalDType::new(10, 2), true.into());
363 for i in 0..array.len() {
364 let scalar = array
365 .execute_scalar(i, &mut LEGACY_SESSION.create_execution_ctx())
366 .unwrap();
367 builder2.append_scalar(&scalar).unwrap();
368 }
369
370 let array2 = builder2.finish();
371 assert_arrays_eq!(&array2, &array);
372
373 let mut builder = DecimalBuilder::new::<i64>(DecimalDType::new(10, 2), false.into());
375 let wrong_scalar = Scalar::from(true);
376 assert!(builder.append_scalar(&wrong_scalar).is_err());
377 }
378}