1use std::any::Any;
5
6use vortex_buffer::BufferMut;
7use vortex_dtype::BigCast;
8use vortex_dtype::DType;
9use vortex_dtype::DecimalDType;
10use vortex_dtype::NativeDecimalType;
11use vortex_dtype::Nullability;
12use vortex_dtype::i256;
13use vortex_dtype::match_each_decimal_value;
14use vortex_dtype::match_each_decimal_value_type;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_error::vortex_ensure;
18use vortex_error::vortex_err;
19use vortex_error::vortex_panic;
20use vortex_mask::Mask;
21
22use crate::Array;
23use crate::ArrayRef;
24use crate::IntoArray;
25use crate::ToCanonical;
26use crate::arrays::DecimalArray;
27use crate::builders::ArrayBuilder;
28use crate::builders::DEFAULT_BUILDER_CAPACITY;
29use crate::builders::LazyBitBufferBuilder;
30use crate::canonical::Canonical;
31use crate::scalar::DecimalValue;
32use crate::scalar::Scalar;
33
34pub struct DecimalBuilder {
40 dtype: DType,
41 values: DecimalBuffer,
42 nulls: LazyBitBufferBuilder,
43}
44
45enum DecimalBuffer {
51 I8(BufferMut<i8>),
52 I16(BufferMut<i16>),
53 I32(BufferMut<i32>),
54 I64(BufferMut<i64>),
55 I128(BufferMut<i128>),
56 I256(BufferMut<i256>),
57}
58
59macro_rules! delegate_fn {
60 ($self:expr, | $tname:ident, $buffer:ident | $body:block) => {{
61 #[allow(unused)]
62 match $self {
63 DecimalBuffer::I8(buffer) => {
64 type $tname = i8;
65 let $buffer = buffer;
66 $body
67 }
68 DecimalBuffer::I16(buffer) => {
69 type $tname = i16;
70 let $buffer = buffer;
71 $body
72 }
73 DecimalBuffer::I32(buffer) => {
74 type $tname = i32;
75 let $buffer = buffer;
76 $body
77 }
78 DecimalBuffer::I64(buffer) => {
79 type $tname = i64;
80 let $buffer = buffer;
81 $body
82 }
83 DecimalBuffer::I128(buffer) => {
84 type $tname = i128;
85 let $buffer = buffer;
86 $body
87 }
88 DecimalBuffer::I256(buffer) => {
89 type $tname = i256;
90 let $buffer = buffer;
91 $body
92 }
93 }
94 }};
95}
96
97impl DecimalBuilder {
98 pub fn new<T: NativeDecimalType>(decimal: DecimalDType, nullability: Nullability) -> Self {
100 Self::with_capacity::<T>(DEFAULT_BUILDER_CAPACITY, decimal, nullability)
101 }
102
103 pub fn with_capacity<T: NativeDecimalType>(
105 capacity: usize,
106 decimal: DecimalDType,
107 nullability: Nullability,
108 ) -> Self {
109 Self {
110 dtype: DType::Decimal(decimal, nullability),
111 values: match_each_decimal_value_type!(T::DECIMAL_TYPE, |D| {
112 DecimalBuffer::from(BufferMut::<D>::with_capacity(capacity))
113 }),
114 nulls: LazyBitBufferBuilder::new(capacity),
115 }
116 }
117
118 pub fn append_value<V: NativeDecimalType>(&mut self, value: V) {
120 self.values.push(value);
121 self.nulls.append_non_null();
122 }
123
124 pub fn finish_into_decimal(&mut self) -> DecimalArray {
126 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
127
128 let decimal_dtype = *self.decimal_dtype();
129
130 delegate_fn!(std::mem::take(&mut self.values), |T, values| {
131 DecimalArray::new::<T>(values.freeze(), decimal_dtype, validity)
132 })
133 }
134
135 pub fn decimal_dtype(&self) -> &DecimalDType {
137 let DType::Decimal(decimal_dtype, _) = &self.dtype else {
138 vortex_panic!("`DecimalBuilder` somehow had dtype {}", self.dtype);
139 };
140
141 decimal_dtype
142 }
143}
144
145impl ArrayBuilder for DecimalBuilder {
146 fn as_any(&self) -> &dyn Any {
147 self
148 }
149
150 fn as_any_mut(&mut self) -> &mut dyn Any {
151 self
152 }
153
154 fn dtype(&self) -> &DType {
155 &self.dtype
156 }
157
158 fn len(&self) -> usize {
159 self.values.len()
160 }
161
162 fn append_zeros(&mut self, n: usize) {
163 self.values.push_n(0, n);
164 self.nulls.append_n_non_nulls(n);
165 }
166
167 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
168 self.values.push_n(0, n);
169 self.nulls.append_n_nulls(n);
170 }
171
172 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
173 vortex_ensure!(
174 scalar.dtype() == self.dtype(),
175 "DecimalBuilder expected scalar with dtype {}, got {}",
176 self.dtype(),
177 scalar.dtype()
178 );
179
180 match scalar.as_decimal().decimal_value() {
181 None => self.append_null(),
182 Some(v) => match_each_decimal_value!(v, |dec_val| {
183 self.append_value(dec_val);
184 }),
185 }
186
187 Ok(())
188 }
189
190 unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
191 let decimal_array = array.to_decimal();
192
193 match_each_decimal_value_type!(decimal_array.values_type(), |D| {
194 self.values
197 .extend(decimal_array.buffer::<D>().iter().copied());
198 });
199
200 self.nulls.append_validity_mask(
201 decimal_array
202 .validity_mask()
203 .vortex_expect("validity_mask in extend_from_array_unchecked"),
204 );
205 }
206
207 fn reserve_exact(&mut self, additional: usize) {
208 self.values.reserve(additional);
209 self.nulls.reserve_exact(additional);
210 }
211
212 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
213 self.nulls = LazyBitBufferBuilder::new(validity.len());
214 self.nulls.append_validity_mask(validity);
215 }
216
217 fn finish(&mut self) -> ArrayRef {
218 self.finish_into_decimal().into_array()
219 }
220
221 fn finish_into_canonical(&mut self) -> Canonical {
222 Canonical::Decimal(self.finish_into_decimal())
223 }
224}
225
226impl DecimalBuffer {
227 fn push<V: NativeDecimalType>(&mut self, value: V) {
228 delegate_fn!(self, |T, buffer| {
229 buffer.push(
230 <T as BigCast>::from(value)
231 .ok_or_else(|| {
232 vortex_err!(
233 "decimal conversion failure {:?}, type: {:?} to {:?}",
234 value,
235 V::DECIMAL_TYPE,
236 T::DECIMAL_TYPE,
237 )
238 })
239 .vortex_expect("operation should succeed in builder"),
240 )
241 });
242 }
243
244 fn push_n<V: NativeDecimalType>(&mut self, value: V, n: usize) {
245 delegate_fn!(self, |T, buffer| {
246 buffer.push_n(
247 <T as BigCast>::from(value).vortex_expect("decimal conversion failure"),
248 n,
249 )
250 });
251 }
252
253 fn reserve(&mut self, additional: usize) {
254 delegate_fn!(self, |T, buffer| { buffer.reserve(additional) })
255 }
256
257 fn len(&self) -> usize {
258 delegate_fn!(self, |T, buffer| { buffer.len() })
259 }
260
261 pub fn extend<I, V: NativeDecimalType>(&mut self, iter: I)
262 where
263 I: Iterator<Item = V>,
264 {
265 delegate_fn!(self, |T, buffer| {
266 buffer.extend(
267 iter.map(|x| <T as BigCast>::from(x).vortex_expect("decimal conversion failure")),
268 )
269 })
270 }
271}
272
273macro_rules! impl_from_buffer {
274 ($T:ty, $variant:ident) => {
275 impl From<BufferMut<$T>> for DecimalBuffer {
276 fn from(buffer: BufferMut<$T>) -> Self {
277 Self::$variant(buffer)
278 }
279 }
280 };
281}
282
283impl_from_buffer!(i8, I8);
284impl_from_buffer!(i16, I16);
285impl_from_buffer!(i32, I32);
286impl_from_buffer!(i64, I64);
287impl_from_buffer!(i128, I128);
288impl_from_buffer!(i256, I256);
289
290impl Default for DecimalBuffer {
291 fn default() -> Self {
292 Self::I8(BufferMut::<i8>::empty())
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use vortex_dtype::DecimalDType;
299
300 use crate::arrays::DecimalArray;
301 use crate::assert_arrays_eq;
302 use crate::builders::ArrayBuilder;
303 use crate::builders::DecimalBuilder;
304
305 #[test]
306 fn test_mixed_extend() {
307 let values = 42i8;
308
309 let mut i8s = DecimalBuilder::new::<i8>(DecimalDType::new(2, 1), false.into());
310 for v in 0..values {
311 i8s.append_value(v);
312 }
313 let i8s = i8s.finish();
314
315 let mut i128s = DecimalBuilder::new::<i128>(DecimalDType::new(2, 1), false.into());
316 i128s.extend_from_array(&i8s);
317 let i128s = i128s.finish();
318
319 for i in 0..i8s.len() {
320 assert_eq!(i8s.scalar_at(i).unwrap(), i128s.scalar_at(i).unwrap());
321 }
322 }
323
324 #[test]
325 fn test_append_scalar() {
326 use crate::scalar::Scalar;
327
328 let mut builder = DecimalBuilder::new::<i64>(DecimalDType::new(10, 2), true.into());
330 builder.append_value(1234i64);
331 builder.append_value(5678i64);
332 builder.append_null();
333
334 let array = builder.finish();
335 let expected = DecimalArray::from_option_iter(
336 [Some(1234i64), Some(5678), None],
337 DecimalDType::new(10, 2),
338 );
339 assert_arrays_eq!(&array, &expected);
340
341 let mut builder2 = DecimalBuilder::new::<i64>(DecimalDType::new(10, 2), true.into());
343 for i in 0..array.len() {
344 let scalar = array.scalar_at(i).unwrap();
345 builder2.append_scalar(&scalar).unwrap();
346 }
347
348 let array2 = builder2.finish();
349 assert_arrays_eq!(&array2, &array);
350
351 let mut builder = DecimalBuilder::new::<i64>(DecimalDType::new(10, 2), false.into());
353 let wrong_scalar = Scalar::from(true);
354 assert!(builder.append_scalar(&wrong_scalar).is_err());
355 }
356}