1use std::any::Any;
2
3use vortex_buffer::{Buffer, BufferMut};
4use vortex_dtype::{DType, DecimalDType, Nullability};
5use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_panic};
6use vortex_mask::Mask;
7use vortex_scalar::{BigCast, NativeDecimalType, i256, match_each_decimal_value_type};
8
9use crate::arrays::{BoolArray, DecimalArray};
10use crate::builders::ArrayBuilder;
11use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
12use crate::validity::Validity;
13use crate::{Array, ArrayRef, IntoArray, ToCanonical};
14
15enum DecimalBuffer {
20 I8(BufferMut<i8>),
21 I16(BufferMut<i16>),
22 I32(BufferMut<i32>),
23 I64(BufferMut<i64>),
24 I128(BufferMut<i128>),
25 I256(BufferMut<i256>),
26}
27
28impl Default for DecimalBuffer {
29 fn default() -> Self {
30 Self::I8(BufferMut::<i8>::empty())
31 }
32}
33
34macro_rules! impl_from_buffer {
35 ($typ:ty, $variant:ident) => {
36 impl From<BufferMut<$typ>> for DecimalBuffer {
37 fn from(buffer: BufferMut<$typ>) -> Self {
38 Self::$variant(buffer)
39 }
40 }
41 };
42}
43impl_from_buffer!(i8, I8);
44impl_from_buffer!(i16, I16);
45impl_from_buffer!(i32, I32);
46impl_from_buffer!(i64, I64);
47impl_from_buffer!(i128, I128);
48impl_from_buffer!(i256, I256);
49
50macro_rules! delegate_fn {
51 ($self:expr, | $tname:ident, $buffer:ident | $body:block) => {{
52 #[allow(unused)]
53 match $self {
54 DecimalBuffer::I8(buffer) => {
55 type $tname = i8;
56 let $buffer = buffer;
57 $body
58 }
59 DecimalBuffer::I16(buffer) => {
60 type $tname = i16;
61 let $buffer = buffer;
62 $body
63 }
64 DecimalBuffer::I32(buffer) => {
65 type $tname = i32;
66 let $buffer = buffer;
67 $body
68 }
69 DecimalBuffer::I64(buffer) => {
70 type $tname = i64;
71 let $buffer = buffer;
72 $body
73 }
74 DecimalBuffer::I128(buffer) => {
75 type $tname = i128;
76 let $buffer = buffer;
77 $body
78 }
79 DecimalBuffer::I256(buffer) => {
80 type $tname = i256;
81 let $buffer = buffer;
82 $body
83 }
84 }
85 }};
86}
87
88impl DecimalBuffer {
89 fn push<V: NativeDecimalType>(&mut self, value: V) {
90 delegate_fn!(self, |T, buffer| {
91 buffer.push(<T as BigCast>::from(value).vortex_expect("decimal conversion failure"))
92 });
93 }
94
95 fn push_n<V: NativeDecimalType>(&mut self, value: V, n: usize) {
96 delegate_fn!(self, |T, buffer| {
97 buffer.push_n(
98 <T as BigCast>::from(value).vortex_expect("decimal conversion failure"),
99 n,
100 )
101 });
102 }
103
104 fn reserve(&mut self, additional: usize) {
105 delegate_fn!(self, |T, buffer| { buffer.reserve(additional) })
106 }
107
108 fn capacity(&self) -> usize {
109 delegate_fn!(self, |T, buffer| { buffer.capacity() })
110 }
111
112 fn len(&self) -> usize {
113 delegate_fn!(self, |T, buffer| { buffer.len() })
114 }
115
116 pub fn extend<I, V: NativeDecimalType>(&mut self, iter: I)
117 where
118 I: Iterator<Item = V>,
119 {
120 delegate_fn!(self, |T, buffer| {
121 buffer.extend(
122 iter.map(|x| <T as BigCast>::from(x).vortex_expect("decimal conversion failure")),
123 )
124 })
125 }
126}
127
128pub struct DecimalBuilder {
134 values: DecimalBuffer,
135 nulls: LazyNullBufferBuilder,
136 dtype: DType,
137}
138
139const DEFAULT_BUILDER_CAPACITY: usize = 1024;
140
141impl DecimalBuilder {
142 pub fn new<T: NativeDecimalType>(precision: u8, scale: i8, nullability: Nullability) -> Self {
143 Self::with_capacity::<T>(
144 DEFAULT_BUILDER_CAPACITY,
145 DecimalDType::new(precision, scale),
146 nullability,
147 )
148 }
149
150 pub fn with_capacity<T: NativeDecimalType>(
151 capacity: usize,
152 decimal: DecimalDType,
153 nullability: Nullability,
154 ) -> Self {
155 Self {
156 values: match_each_decimal_value_type!(T::VALUES_TYPE, |D| {
157 DecimalBuffer::from(BufferMut::<D>::with_capacity(capacity))
158 }),
159 nulls: LazyNullBufferBuilder::new(capacity),
160 dtype: DType::Decimal(decimal, nullability),
161 }
162 }
163}
164
165impl DecimalBuilder {
166 fn extend_with_validity_mask(&mut self, validity_mask: Mask) {
167 self.nulls.append_validity_mask(validity_mask);
168 }
169
170 fn extend_from_buffer<V: NativeDecimalType>(&mut self, values: &Buffer<V>) {
173 self.values.extend(values.iter().copied());
174 }
175}
176
177impl DecimalBuilder {
178 pub fn append_value<V: NativeDecimalType>(&mut self, value: V) {
179 self.values.push(value);
180 self.nulls.append(true);
181 }
182
183 pub fn append_option<V: NativeDecimalType>(&mut self, value: Option<V>) {
184 match value {
185 Some(value) => {
186 self.values.push(value);
187 self.nulls.append(true);
188 }
189 None => self.append_null(),
190 }
191 }
192
193 pub fn append_mask(&mut self, mask: Mask) {
195 self.nulls.append_validity_mask(mask);
196 }
197}
198
199impl DecimalBuilder {
200 pub fn finish_into_decimal(&mut self) -> DecimalArray {
201 let nulls = self.nulls.finish();
202
203 if let Some(null_buf) = nulls.as_ref() {
204 assert_eq!(
205 null_buf.len(),
206 self.values.len(),
207 "null buffer length must equal value buffer length"
208 );
209 }
210
211 let validity = match (nulls, self.dtype.nullability()) {
212 (None, Nullability::NonNullable) => Validity::NonNullable,
213 (Some(_), Nullability::NonNullable) => {
214 vortex_panic!("Non-nullable builder has null values")
215 }
216 (None, Nullability::Nullable) => Validity::AllValid,
217 (Some(nulls), Nullability::Nullable) => {
218 if nulls.null_count() == nulls.len() {
219 Validity::AllInvalid
220 } else {
221 Validity::Array(BoolArray::from(nulls.into_inner()).into_array())
222 }
223 }
224 };
225
226 let DType::Decimal(decimal_dtype, _) = self.dtype else {
227 vortex_panic!("DecimalBuilder must have Decimal DType");
228 };
229
230 delegate_fn!(std::mem::take(&mut self.values), |T, values| {
231 DecimalArray::new::<T>(values.freeze(), decimal_dtype, validity)
232 })
233 }
234}
235
236impl ArrayBuilder for DecimalBuilder {
237 fn as_any(&self) -> &dyn Any {
238 self
239 }
240
241 fn as_any_mut(&mut self) -> &mut dyn Any {
242 self
243 }
244
245 fn dtype(&self) -> &DType {
246 &self.dtype
247 }
248
249 fn len(&self) -> usize {
250 self.values.len()
251 }
252
253 fn append_zeros(&mut self, n: usize) {
254 self.values.push_n(0, n);
255 self.nulls.append_n_non_nulls(n);
256 }
257
258 fn append_nulls(&mut self, n: usize) {
259 self.values.push_n(0, n);
260 self.nulls.append_n_nulls(n);
261 }
262
263 fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
264 let array = array.to_decimal()?;
265
266 let DType::Decimal(decimal_dtype, _) = self.dtype else {
267 vortex_panic!("DecimalBuilder must have Decimal DType");
268 };
269
270 if array.decimal_dtype() != decimal_dtype {
271 vortex_bail!(
272 "Cannot extend from array with different decimal type: {:?} != {:?}",
273 array.decimal_dtype(),
274 decimal_dtype
275 );
276 }
277
278 match_each_decimal_value_type!(array.values_type(), |D| {
279 self.extend_from_buffer(&array.buffer::<D>())
280 });
281
282 self.extend_with_validity_mask(array.validity_mask()?);
283
284 Ok(())
285 }
286
287 fn ensure_capacity(&mut self, capacity: usize) {
288 if capacity > self.values.capacity() {
289 self.values.reserve(capacity - self.values.len());
290 self.nulls.ensure_capacity(capacity);
291 }
292 }
293
294 fn set_validity(&mut self, validity: Mask) {
295 self.nulls = LazyNullBufferBuilder::new(validity.len());
296 self.nulls.append_validity_mask(validity);
297 }
298
299 fn finish(&mut self) -> ArrayRef {
300 self.finish_into_decimal().into_array()
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use crate::builders::{ArrayBuilder, DecimalBuilder};
307
308 #[test]
309 fn test_mixed_extend() {
310 let mut i8s = DecimalBuilder::new::<i8>(2, 1, false.into());
311 i8s.append_value(10);
312 i8s.append_value(11);
313 i8s.append_value(12);
314 let i8s = i8s.finish();
315
316 let mut i128s = DecimalBuilder::new::<i128>(2, 1, false.into());
317 i128s.extend_from_array(&i8s).unwrap();
318 let i128s = i128s.finish_into_decimal();
319 assert_eq!(i128s.buffer::<i128>().as_slice(), &[10, 11, 12]);
320 }
321}