vortex_array/arrays/decimal/
array.rs1use arrow_buffer::BooleanBufferBuilder;
5use itertools::Itertools;
6use vortex_buffer::{Buffer, BufferMut, ByteBuffer};
7use vortex_dtype::{DType, DecimalDType, IntegerPType, match_each_integer_ptype};
8use vortex_error::{VortexExpect, VortexResult, vortex_ensure, vortex_panic};
9use vortex_scalar::{BigCast, DecimalValueType, NativeDecimalType, match_each_decimal_value_type};
10
11use crate::ToCanonical;
12use crate::arrays::is_compatible_decimal_value_type;
13use crate::patches::Patches;
14use crate::stats::ArrayStats;
15use crate::validity::Validity;
16use crate::vtable::ValidityHelper;
17
18#[derive(Clone, Debug)]
76pub struct DecimalArray {
77 pub(super) dtype: DType,
78 pub(super) values: ByteBuffer,
79 pub(super) values_type: DecimalValueType,
80 pub(super) validity: Validity,
81 pub(super) stats_set: ArrayStats,
82}
83
84impl DecimalArray {
85 pub fn new<T: NativeDecimalType>(
92 buffer: Buffer<T>,
93 decimal_dtype: DecimalDType,
94 validity: Validity,
95 ) -> Self {
96 Self::try_new(buffer, decimal_dtype, validity)
97 .vortex_expect("DecimalArray construction failed")
98 }
99
100 pub fn try_new<T: NativeDecimalType>(
109 buffer: Buffer<T>,
110 decimal_dtype: DecimalDType,
111 validity: Validity,
112 ) -> VortexResult<Self> {
113 Self::validate(&buffer, &validity)?;
114
115 Ok(unsafe { Self::new_unchecked(buffer, decimal_dtype, validity) })
117 }
118
119 pub unsafe fn new_unchecked<T: NativeDecimalType>(
133 buffer: Buffer<T>,
134 decimal_dtype: DecimalDType,
135 validity: Validity,
136 ) -> Self {
137 #[cfg(debug_assertions)]
138 Self::validate(&buffer, &validity)
139 .vortex_expect("[Debug Assertion]: Invalid `DecimalArray` parameters");
140
141 Self {
142 values: buffer.into_byte_buffer(),
143 values_type: T::VALUES_TYPE,
144 dtype: DType::Decimal(decimal_dtype, validity.nullability()),
145 validity,
146 stats_set: Default::default(),
147 }
148 }
149
150 pub fn validate<T: NativeDecimalType>(
154 buffer: &Buffer<T>,
155 validity: &Validity,
156 ) -> VortexResult<()> {
157 if let Some(len) = validity.maybe_len() {
158 vortex_ensure!(
159 buffer.len() == len,
160 "Buffer and validity length mismatch: buffer={}, validity={}",
161 buffer.len(),
162 len,
163 );
164 }
165
166 Ok(())
167 }
168
169 pub fn byte_buffer(&self) -> ByteBuffer {
171 self.values.clone()
172 }
173
174 pub fn buffer<T: NativeDecimalType>(&self) -> Buffer<T> {
175 if self.values_type != T::VALUES_TYPE {
176 vortex_panic!(
177 "Cannot extract Buffer<{:?}> for DecimalArray with values_type {:?}",
178 T::VALUES_TYPE,
179 self.values_type,
180 );
181 }
182 Buffer::<T>::from_byte_buffer(self.values.clone())
183 }
184
185 pub fn decimal_dtype(&self) -> DecimalDType {
187 if let DType::Decimal(decimal_dtype, _) = self.dtype {
188 decimal_dtype
189 } else {
190 vortex_panic!("Expected Decimal dtype, got {:?}", self.dtype)
191 }
192 }
193
194 pub fn values_type(&self) -> DecimalValueType {
195 self.values_type
196 }
197
198 pub fn precision(&self) -> u8 {
199 self.decimal_dtype().precision()
200 }
201
202 pub fn scale(&self) -> i8 {
203 self.decimal_dtype().scale()
204 }
205
206 pub fn from_option_iter<T: NativeDecimalType, I: IntoIterator<Item = Option<T>>>(
207 iter: I,
208 decimal_dtype: DecimalDType,
209 ) -> Self {
210 let iter = iter.into_iter();
211 let mut values = BufferMut::with_capacity(iter.size_hint().0);
212 let mut validity = BooleanBufferBuilder::new(values.capacity());
213
214 for i in iter {
215 match i {
216 None => {
217 validity.append(false);
218 values.push(T::default());
219 }
220 Some(e) => {
221 validity.append(true);
222 values.push(e);
223 }
224 }
225 }
226 Self::new(
227 values.freeze(),
228 decimal_dtype,
229 Validity::from(validity.finish()),
230 )
231 }
232
233 #[allow(clippy::cognitive_complexity)]
234 pub fn patch(self, patches: &Patches) -> Self {
235 let offset = patches.offset();
236 let patch_indices = patches.indices().to_primitive();
237 let patch_values = patches.values().to_decimal();
238
239 let patched_validity = self.validity().clone().patch(
240 self.len(),
241 offset,
242 patch_indices.as_ref(),
243 patch_values.validity(),
244 );
245 assert_eq!(self.decimal_dtype(), patch_values.decimal_dtype());
246
247 match_each_integer_ptype!(patch_indices.ptype(), |I| {
248 let patch_indices = patch_indices.as_slice::<I>();
249 match_each_decimal_value_type!(patch_values.values_type(), |PatchDVT| {
250 let patch_values = patch_values.buffer::<PatchDVT>();
251 match_each_decimal_value_type!(self.values_type(), |ValuesDVT| {
252 let buffer = self.buffer::<ValuesDVT>().into_mut();
253 patch_typed(
254 buffer,
255 self.decimal_dtype(),
256 patch_indices,
257 offset,
258 patch_values,
259 patched_validity,
260 )
261 })
262 })
263 })
264 }
265}
266
267fn patch_typed<I, ValuesDVT, PatchDVT>(
268 mut buffer: BufferMut<ValuesDVT>,
269 decimal_dtype: DecimalDType,
270 patch_indices: &[I],
271 patch_indices_offset: usize,
272 patch_values: Buffer<PatchDVT>,
273 patched_validity: Validity,
274) -> DecimalArray
275where
276 I: IntegerPType,
277 PatchDVT: NativeDecimalType,
278 ValuesDVT: NativeDecimalType,
279{
280 if !is_compatible_decimal_value_type(ValuesDVT::VALUES_TYPE, decimal_dtype) {
281 vortex_panic!(
282 "patch_typed: {:?} cannot represent every value in {}.",
283 ValuesDVT::VALUES_TYPE,
284 decimal_dtype
285 )
286 }
287
288 for (idx, value) in patch_indices.iter().zip_eq(patch_values.into_iter()) {
289 buffer[idx.as_() - patch_indices_offset] = <ValuesDVT as BigCast>::from(value).vortex_expect(
290 "values of a given DecimalDType are representable in all compatible NativeDecimalType",
291 );
292 }
293
294 DecimalArray::new(buffer.freeze(), decimal_dtype, patched_validity)
295}