1use std::any::Any;
5
6use vortex_buffer::BufferMut;
7use vortex_dtype::{
8 BigCast, DType, DecimalDType, NativeDecimalType, Nullability, match_each_decimal_value,
9 match_each_decimal_value_type,
10};
11use vortex_error::{
12 VortexExpect, VortexResult, VortexUnwrap, vortex_ensure, vortex_err, vortex_panic,
13};
14use vortex_mask::Mask;
15use vortex_scalar::{DecimalValue, Scalar, i256};
16
17use crate::arrays::DecimalArray;
18use crate::builders::{ArrayBuilder, DEFAULT_BUILDER_CAPACITY, LazyBitBufferBuilder};
19use crate::canonical::Canonical;
20use crate::{Array, ArrayRef, IntoArray, ToCanonical};
21
22pub struct DecimalBuilder {
28 dtype: DType,
29 values: DecimalBuffer,
30 nulls: LazyBitBufferBuilder,
31}
32
33enum DecimalBuffer {
39 I8(BufferMut<i8>),
40 I16(BufferMut<i16>),
41 I32(BufferMut<i32>),
42 I64(BufferMut<i64>),
43 I128(BufferMut<i128>),
44 I256(BufferMut<i256>),
45}
46
47macro_rules! delegate_fn {
48 ($self:expr, | $tname:ident, $buffer:ident | $body:block) => {{
49 #[allow(unused)]
50 match $self {
51 DecimalBuffer::I8(buffer) => {
52 type $tname = i8;
53 let $buffer = buffer;
54 $body
55 }
56 DecimalBuffer::I16(buffer) => {
57 type $tname = i16;
58 let $buffer = buffer;
59 $body
60 }
61 DecimalBuffer::I32(buffer) => {
62 type $tname = i32;
63 let $buffer = buffer;
64 $body
65 }
66 DecimalBuffer::I64(buffer) => {
67 type $tname = i64;
68 let $buffer = buffer;
69 $body
70 }
71 DecimalBuffer::I128(buffer) => {
72 type $tname = i128;
73 let $buffer = buffer;
74 $body
75 }
76 DecimalBuffer::I256(buffer) => {
77 type $tname = i256;
78 let $buffer = buffer;
79 $body
80 }
81 }
82 }};
83}
84
85impl DecimalBuilder {
86 pub fn new<T: NativeDecimalType>(decimal: DecimalDType, nullability: Nullability) -> Self {
88 Self::with_capacity::<T>(DEFAULT_BUILDER_CAPACITY, decimal, nullability)
89 }
90
91 pub fn with_capacity<T: NativeDecimalType>(
93 capacity: usize,
94 decimal: DecimalDType,
95 nullability: Nullability,
96 ) -> Self {
97 Self {
98 dtype: DType::Decimal(decimal, nullability),
99 values: match_each_decimal_value_type!(T::DECIMAL_TYPE, |D| {
100 DecimalBuffer::from(BufferMut::<D>::with_capacity(capacity))
101 }),
102 nulls: LazyBitBufferBuilder::new(capacity),
103 }
104 }
105
106 pub fn append_value<V: NativeDecimalType>(&mut self, value: V) {
108 self.values.push(value);
109 self.nulls.append_non_null();
110 }
111
112 pub fn finish_into_decimal(&mut self) -> DecimalArray {
114 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
115
116 let decimal_dtype = *self.decimal_dtype();
117
118 delegate_fn!(std::mem::take(&mut self.values), |T, values| {
119 DecimalArray::new::<T>(values.freeze(), decimal_dtype, validity)
120 })
121 }
122
123 pub fn decimal_dtype(&self) -> &DecimalDType {
125 let DType::Decimal(decimal_dtype, _) = &self.dtype else {
126 vortex_panic!("`DecimalBuilder` somehow had dtype {}", self.dtype);
127 };
128
129 decimal_dtype
130 }
131}
132
133impl ArrayBuilder for DecimalBuilder {
134 fn as_any(&self) -> &dyn Any {
135 self
136 }
137
138 fn as_any_mut(&mut self) -> &mut dyn Any {
139 self
140 }
141
142 fn dtype(&self) -> &DType {
143 &self.dtype
144 }
145
146 fn len(&self) -> usize {
147 self.values.len()
148 }
149
150 fn append_zeros(&mut self, n: usize) {
151 self.values.push_n(0, n);
152 self.nulls.append_n_non_nulls(n);
153 }
154
155 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
156 self.values.push_n(0, n);
157 self.nulls.append_n_nulls(n);
158 }
159
160 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
161 vortex_ensure!(
162 scalar.dtype() == self.dtype(),
163 "DecimalBuilder expected scalar with dtype {:?}, got {:?}",
164 self.dtype(),
165 scalar.dtype()
166 );
167
168 match scalar.as_decimal().decimal_value() {
169 None => self.append_null(),
170 Some(v) => match_each_decimal_value!(v, |dec_val| {
171 self.append_value(dec_val);
172 }),
173 }
174
175 Ok(())
176 }
177
178 unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
179 let decimal_array = array.to_decimal();
180
181 match_each_decimal_value_type!(decimal_array.values_type(), |D| {
182 self.values
185 .extend(decimal_array.buffer::<D>().iter().copied());
186 });
187
188 self.nulls
189 .append_validity_mask(decimal_array.validity_mask());
190 }
191
192 fn reserve_exact(&mut self, additional: usize) {
193 self.values.reserve(additional);
194 self.nulls.reserve_exact(additional);
195 }
196
197 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
198 self.nulls = LazyBitBufferBuilder::new(validity.len());
199 self.nulls.append_validity_mask(validity);
200 }
201
202 fn finish(&mut self) -> ArrayRef {
203 self.finish_into_decimal().into_array()
204 }
205
206 fn finish_into_canonical(&mut self) -> Canonical {
207 Canonical::Decimal(self.finish_into_decimal())
208 }
209}
210
211impl DecimalBuffer {
212 fn push<V: NativeDecimalType>(&mut self, value: V) {
213 delegate_fn!(self, |T, buffer| {
214 buffer.push(
215 <T as BigCast>::from(value)
216 .ok_or_else(|| {
217 vortex_err!(
218 "decimal conversion failure {:?}, type: {:?} to {:?}",
219 value,
220 V::DECIMAL_TYPE,
221 T::DECIMAL_TYPE,
222 )
223 })
224 .vortex_unwrap(),
225 )
226 });
227 }
228
229 fn push_n<V: NativeDecimalType>(&mut self, value: V, n: usize) {
230 delegate_fn!(self, |T, buffer| {
231 buffer.push_n(
232 <T as BigCast>::from(value).vortex_expect("decimal conversion failure"),
233 n,
234 )
235 });
236 }
237
238 fn reserve(&mut self, additional: usize) {
239 delegate_fn!(self, |T, buffer| { buffer.reserve(additional) })
240 }
241
242 fn len(&self) -> usize {
243 delegate_fn!(self, |T, buffer| { buffer.len() })
244 }
245
246 pub fn extend<I, V: NativeDecimalType>(&mut self, iter: I)
247 where
248 I: Iterator<Item = V>,
249 {
250 delegate_fn!(self, |T, buffer| {
251 buffer.extend(
252 iter.map(|x| <T as BigCast>::from(x).vortex_expect("decimal conversion failure")),
253 )
254 })
255 }
256}
257
258macro_rules! impl_from_buffer {
259 ($T:ty, $variant:ident) => {
260 impl From<BufferMut<$T>> for DecimalBuffer {
261 fn from(buffer: BufferMut<$T>) -> Self {
262 Self::$variant(buffer)
263 }
264 }
265 };
266}
267
268impl_from_buffer!(i8, I8);
269impl_from_buffer!(i16, I16);
270impl_from_buffer!(i32, I32);
271impl_from_buffer!(i64, I64);
272impl_from_buffer!(i128, I128);
273impl_from_buffer!(i256, I256);
274
275impl Default for DecimalBuffer {
276 fn default() -> Self {
277 Self::I8(BufferMut::<i8>::empty())
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use vortex_dtype::DecimalDType;
284
285 use crate::arrays::DecimalArray;
286 use crate::assert_arrays_eq;
287 use crate::builders::{ArrayBuilder, DecimalBuilder};
288
289 #[test]
290 fn test_mixed_extend() {
291 let values = 42i8;
292
293 let mut i8s = DecimalBuilder::new::<i8>(DecimalDType::new(2, 1), false.into());
294 for v in 0..values {
295 i8s.append_value(v);
296 }
297 let i8s = i8s.finish();
298
299 let mut i128s = DecimalBuilder::new::<i128>(DecimalDType::new(2, 1), false.into());
300 i128s.extend_from_array(&i8s);
301 let i128s = i128s.finish();
302
303 for i in 0..i8s.len() {
304 assert_eq!(i8s.scalar_at(i), i128s.scalar_at(i));
305 }
306 }
307
308 #[test]
309 fn test_append_scalar() {
310 use vortex_scalar::Scalar;
311
312 let mut builder = DecimalBuilder::new::<i64>(DecimalDType::new(10, 2), true.into());
314 builder.append_value(1234i64);
315 builder.append_value(5678i64);
316 builder.append_null();
317
318 let array = builder.finish();
319 let expected = DecimalArray::from_option_iter(
320 [Some(1234i64), Some(5678), None],
321 DecimalDType::new(10, 2),
322 );
323 assert_arrays_eq!(&array, &expected);
324
325 let mut builder2 = DecimalBuilder::new::<i64>(DecimalDType::new(10, 2), true.into());
327 for i in 0..array.len() {
328 let scalar = array.scalar_at(i);
329 builder2.append_scalar(&scalar).unwrap();
330 }
331
332 let array2 = builder2.finish();
333 assert_arrays_eq!(&array2, &array);
334
335 let mut builder = DecimalBuilder::new::<i64>(DecimalDType::new(10, 2), false.into());
337 let wrong_scalar = Scalar::from(true);
338 assert!(builder.append_scalar(&wrong_scalar).is_err());
339 }
340}