1use std::any::Any;
5
6use vortex_buffer::BufferMut;
7use vortex_dtype::{DType, DecimalDType, Nullability};
8use vortex_error::{VortexExpect, VortexResult, vortex_ensure, vortex_panic};
9use vortex_mask::Mask;
10use vortex_scalar::{
11 BigCast, DecimalValue, NativeDecimalType, Scalar, i256, match_each_decimal_value,
12 match_each_decimal_value_type,
13};
14
15use crate::arrays::DecimalArray;
16use crate::builders::{ArrayBuilder, DEFAULT_BUILDER_CAPACITY, LazyNullBufferBuilder};
17use crate::canonical::Canonical;
18use crate::{Array, ArrayRef, IntoArray, ToCanonical};
19
20pub struct DecimalBuilder {
26 dtype: DType,
27 values: DecimalBuffer,
28 nulls: LazyNullBufferBuilder,
29}
30
31enum DecimalBuffer {
37 I8(BufferMut<i8>),
38 I16(BufferMut<i16>),
39 I32(BufferMut<i32>),
40 I64(BufferMut<i64>),
41 I128(BufferMut<i128>),
42 I256(BufferMut<i256>),
43}
44
45macro_rules! delegate_fn {
46 ($self:expr, | $tname:ident, $buffer:ident | $body:block) => {{
47 #[allow(unused)]
48 match $self {
49 DecimalBuffer::I8(buffer) => {
50 type $tname = i8;
51 let $buffer = buffer;
52 $body
53 }
54 DecimalBuffer::I16(buffer) => {
55 type $tname = i16;
56 let $buffer = buffer;
57 $body
58 }
59 DecimalBuffer::I32(buffer) => {
60 type $tname = i32;
61 let $buffer = buffer;
62 $body
63 }
64 DecimalBuffer::I64(buffer) => {
65 type $tname = i64;
66 let $buffer = buffer;
67 $body
68 }
69 DecimalBuffer::I128(buffer) => {
70 type $tname = i128;
71 let $buffer = buffer;
72 $body
73 }
74 DecimalBuffer::I256(buffer) => {
75 type $tname = i256;
76 let $buffer = buffer;
77 $body
78 }
79 }
80 }};
81}
82
83impl DecimalBuilder {
84 pub fn new<T: NativeDecimalType>(precision: u8, scale: i8, nullability: Nullability) -> Self {
86 Self::with_capacity::<T>(
87 DEFAULT_BUILDER_CAPACITY,
88 DecimalDType::new(precision, scale),
89 nullability,
90 )
91 }
92
93 pub fn with_capacity<T: NativeDecimalType>(
95 capacity: usize,
96 decimal: DecimalDType,
97 nullability: Nullability,
98 ) -> Self {
99 Self {
100 dtype: DType::Decimal(decimal, nullability),
101 values: match_each_decimal_value_type!(T::VALUES_TYPE, |D| {
102 DecimalBuffer::from(BufferMut::<D>::with_capacity(capacity))
103 }),
104 nulls: LazyNullBufferBuilder::new(capacity),
105 }
106 }
107
108 pub fn append_value<V: NativeDecimalType>(&mut self, value: V) {
110 self.values.push(value);
111 self.nulls.append_non_null();
112 }
113
114 pub fn finish_into_decimal(&mut self) -> DecimalArray {
116 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
117
118 let decimal_dtype = *self.decimal_dtype();
119
120 delegate_fn!(std::mem::take(&mut self.values), |T, values| {
121 DecimalArray::new::<T>(values.freeze(), decimal_dtype, validity)
122 })
123 }
124
125 pub fn decimal_dtype(&self) -> &DecimalDType {
127 let DType::Decimal(decimal_dtype, _) = &self.dtype else {
128 vortex_panic!("`DecimalBuilder` somehow had dtype {}", self.dtype);
129 };
130
131 decimal_dtype
132 }
133}
134
135impl ArrayBuilder for DecimalBuilder {
136 fn as_any(&self) -> &dyn Any {
137 self
138 }
139
140 fn as_any_mut(&mut self) -> &mut dyn Any {
141 self
142 }
143
144 fn dtype(&self) -> &DType {
145 &self.dtype
146 }
147
148 fn len(&self) -> usize {
149 self.values.len()
150 }
151
152 fn append_zeros(&mut self, n: usize) {
153 self.values.push_n(0, n);
154 self.nulls.append_n_non_nulls(n);
155 }
156
157 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
158 self.values.push_n(0, n);
159 self.nulls.append_n_nulls(n);
160 }
161
162 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
163 vortex_ensure!(
164 scalar.dtype() == self.dtype(),
165 "DecimalBuilder expected scalar with dtype {:?}, got {:?}",
166 self.dtype(),
167 scalar.dtype()
168 );
169
170 match scalar.as_decimal().decimal_value() {
171 None => self.append_null(),
172 Some(v) => match_each_decimal_value!(v, |dec_val| {
173 self.append_value(dec_val);
174 }),
175 }
176
177 Ok(())
178 }
179
180 unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
181 let decimal_array = array.to_decimal();
182
183 match_each_decimal_value_type!(decimal_array.values_type(), |D| {
184 self.values
187 .extend(decimal_array.buffer::<D>().iter().copied());
188 });
189
190 self.nulls
191 .append_validity_mask(decimal_array.validity_mask());
192 }
193
194 fn ensure_capacity(&mut self, capacity: usize) {
195 if capacity > self.values.capacity() {
196 self.values.reserve(capacity - self.values.len());
197 self.nulls.ensure_capacity(capacity);
198 }
199 }
200
201 fn set_validity(&mut self, validity: Mask) {
202 self.nulls = LazyNullBufferBuilder::new(validity.len());
203 self.nulls.append_validity_mask(validity);
204 }
205
206 fn finish(&mut self) -> ArrayRef {
207 self.finish_into_decimal().into_array()
208 }
209
210 fn finish_into_canonical(&mut self) -> Canonical {
211 Canonical::Decimal(self.finish_into_decimal())
212 }
213}
214
215impl DecimalBuffer {
216 fn push<V: NativeDecimalType>(&mut self, value: V) {
217 delegate_fn!(self, |T, buffer| {
218 buffer.push(<T as BigCast>::from(value).vortex_expect("decimal conversion failure"))
219 });
220 }
221
222 fn push_n<V: NativeDecimalType>(&mut self, value: V, n: usize) {
223 delegate_fn!(self, |T, buffer| {
224 buffer.push_n(
225 <T as BigCast>::from(value).vortex_expect("decimal conversion failure"),
226 n,
227 )
228 });
229 }
230
231 fn reserve(&mut self, additional: usize) {
232 delegate_fn!(self, |T, buffer| { buffer.reserve(additional) })
233 }
234
235 fn capacity(&self) -> usize {
236 delegate_fn!(self, |T, buffer| { buffer.capacity() })
237 }
238
239 fn len(&self) -> usize {
240 delegate_fn!(self, |T, buffer| { buffer.len() })
241 }
242
243 pub fn extend<I, V: NativeDecimalType>(&mut self, iter: I)
244 where
245 I: Iterator<Item = V>,
246 {
247 delegate_fn!(self, |T, buffer| {
248 buffer.extend(
249 iter.map(|x| <T as BigCast>::from(x).vortex_expect("decimal conversion failure")),
250 )
251 })
252 }
253}
254
255macro_rules! impl_from_buffer {
256 ($T:ty, $variant:ident) => {
257 impl From<BufferMut<$T>> for DecimalBuffer {
258 fn from(buffer: BufferMut<$T>) -> Self {
259 Self::$variant(buffer)
260 }
261 }
262 };
263}
264
265impl_from_buffer!(i8, I8);
266impl_from_buffer!(i16, I16);
267impl_from_buffer!(i32, I32);
268impl_from_buffer!(i64, I64);
269impl_from_buffer!(i128, I128);
270impl_from_buffer!(i256, I256);
271
272impl Default for DecimalBuffer {
273 fn default() -> Self {
274 Self::I8(BufferMut::<i8>::empty())
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use crate::builders::{ArrayBuilder, DecimalBuilder};
281
282 #[test]
283 fn test_mixed_extend() {
284 let values = 42i8;
285
286 let mut i8s = DecimalBuilder::new::<i8>(2, 1, false.into());
287 for v in 0..values {
288 i8s.append_value(v);
289 }
290 let i8s = i8s.finish();
291
292 let mut i128s = DecimalBuilder::new::<i128>(2, 1, false.into());
293 i128s.extend_from_array(&i8s);
294 let i128s = i128s.finish();
295
296 for i in 0..i8s.len() {
297 assert_eq!(i8s.scalar_at(i), i128s.scalar_at(i));
298 }
299 }
300
301 #[test]
302 fn test_append_scalar() {
303 use vortex_scalar::Scalar;
304
305 let mut builder = DecimalBuilder::new::<i64>(10, 2, true.into());
307 builder.append_value(1234i64);
308 builder.append_value(5678i64);
309 builder.append_null();
310
311 let array = builder.finish();
312 assert_eq!(array.len(), 3);
313
314 let scalar0 = array.scalar_at(0);
316 let decimal0 = scalar0.as_decimal();
317 assert!(decimal0.decimal_value().is_some());
318 let scalar1 = array.scalar_at(1);
321 let decimal1 = scalar1.as_decimal();
322 assert!(decimal1.decimal_value().is_some());
323
324 let scalar2 = array.scalar_at(2);
325 let decimal2 = scalar2.as_decimal();
326 assert!(decimal2.decimal_value().is_none()); let mut builder2 = DecimalBuilder::new::<i64>(10, 2, true.into());
330 for i in 0..array.len() {
331 let scalar = array.scalar_at(i);
332 builder2.append_scalar(&scalar).unwrap();
333 }
334
335 let array2 = builder2.finish();
336 assert_eq!(array2.len(), 3);
337
338 for i in 0..3 {
340 assert_eq!(array.scalar_at(i), array2.scalar_at(i));
341 }
342
343 let mut builder = DecimalBuilder::new::<i64>(10, 2, false.into());
345 let wrong_scalar = Scalar::from(true);
346 assert!(builder.append_scalar(&wrong_scalar).is_err());
347 }
348}