vortex_array/arrays/varbin/
mod.rs1use std::fmt::Debug;
2
3pub use compute::compute_min_max;
4use num_traits::PrimInt;
5use vortex_buffer::ByteBuffer;
6use vortex_dtype::{DType, NativePType, Nullability};
7use vortex_error::{
8 VortexExpect as _, VortexResult, VortexUnwrap as _, vortex_bail, vortex_err, vortex_panic,
9};
10use vortex_mask::Mask;
11use vortex_scalar::Scalar;
12
13use crate::array::ArrayValidityImpl;
14use crate::arrays::varbin::builder::VarBinBuilder;
15use crate::arrays::varbin::serde::VarBinMetadata;
16use crate::compute::scalar_at;
17use crate::stats::{ArrayStats, StatsSetRef};
18use crate::validity::Validity;
19use crate::vtable::VTableRef;
20use crate::{
21 Array, ArrayImpl, ArrayRef, ArrayStatisticsImpl, Encoding, RkyvMetadata, try_from_array_ref,
22};
23
24mod accessor;
25pub mod builder;
26mod canonical;
27mod compute;
28mod serde;
29mod variants;
30
31#[derive(Clone, Debug)]
32pub struct VarBinArray {
33 dtype: DType,
34 bytes: ByteBuffer,
35 offsets: ArrayRef,
36 validity: Validity,
37 stats_set: ArrayStats,
38}
39
40try_from_array_ref!(VarBinArray);
41
42pub struct VarBinEncoding;
43impl Encoding for VarBinEncoding {
44 type Array = VarBinArray;
45 type Metadata = RkyvMetadata<VarBinMetadata>;
46}
47
48impl VarBinArray {
49 pub fn try_new(
50 offsets: ArrayRef,
51 bytes: ByteBuffer,
52 dtype: DType,
53 validity: Validity,
54 ) -> VortexResult<Self> {
55 if !offsets.dtype().is_int() || offsets.dtype().is_nullable() {
56 vortex_bail!(MismatchedTypes: "non nullable int", offsets.dtype());
57 }
58 if !matches!(dtype, DType::Binary(_) | DType::Utf8(_)) {
59 vortex_bail!(MismatchedTypes: "utf8 or binary", dtype);
60 }
61 if dtype.is_nullable() == (validity == Validity::NonNullable) {
62 vortex_bail!("incorrect validity {:?}", validity);
63 }
64
65 Ok(Self {
66 dtype,
67 bytes,
68 offsets,
69 validity,
70 stats_set: Default::default(),
71 })
72 }
73
74 #[inline]
75 pub fn offsets(&self) -> &ArrayRef {
76 &self.offsets
77 }
78
79 pub fn validity(&self) -> &Validity {
80 &self.validity
81 }
82
83 #[inline]
91 pub fn bytes(&self) -> &ByteBuffer {
92 &self.bytes
93 }
94
95 pub fn sliced_bytes(&self) -> ByteBuffer {
98 let first_offset: usize = self.offset_at(0).vortex_expect("1st offset");
99 let last_offset = self.offset_at(self.len()).vortex_expect("Last offset");
100
101 self.bytes().slice(first_offset..last_offset)
102 }
103
104 pub fn from_vec<T: AsRef<[u8]>>(vec: Vec<T>, dtype: DType) -> Self {
105 let size: usize = vec.iter().map(|v| v.as_ref().len()).sum();
106 if size < u32::MAX as usize {
107 Self::from_vec_sized::<u32, T>(vec, dtype)
108 } else {
109 Self::from_vec_sized::<u64, T>(vec, dtype)
110 }
111 }
112
113 fn from_vec_sized<O, T>(vec: Vec<T>, dtype: DType) -> Self
114 where
115 O: NativePType + PrimInt,
116 T: AsRef<[u8]>,
117 {
118 let mut builder = VarBinBuilder::<O>::with_capacity(vec.len());
119 for v in vec {
120 builder.append_value(v.as_ref());
121 }
122 builder.finish(dtype)
123 }
124
125 #[allow(clippy::same_name_method)]
126 pub fn from_iter<T: AsRef<[u8]>, I: IntoIterator<Item = Option<T>>>(
127 iter: I,
128 dtype: DType,
129 ) -> Self {
130 let iter = iter.into_iter();
131 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
132 for v in iter {
133 builder.append(v.as_ref().map(|o| o.as_ref()));
134 }
135 builder.finish(dtype)
136 }
137
138 pub fn from_iter_nonnull<T: AsRef<[u8]>, I: IntoIterator<Item = T>>(
139 iter: I,
140 dtype: DType,
141 ) -> Self {
142 let iter = iter.into_iter();
143 let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
144 for v in iter {
145 builder.append_value(v);
146 }
147 builder.finish(dtype)
148 }
149
150 pub fn offset_at(&self, index: usize) -> VortexResult<usize> {
154 if index > self.len() + 1 {
155 vortex_bail!(OutOfBounds: index, 0, self.len() + 1)
156 }
157
158 Ok(scalar_at(self.offsets(), index)
160 .unwrap_or_else(|err| vortex_panic!(err, "Failed to get offset at index: {}", index))
161 .as_ref()
162 .try_into()
163 .vortex_expect("Failed to convert offset to usize"))
164 }
165
166 pub fn bytes_at(&self, index: usize) -> VortexResult<ByteBuffer> {
170 let start = self.offset_at(index)?;
171 let end = self.offset_at(index + 1)?;
172
173 Ok(self.bytes().slice(start..end))
174 }
175
176 pub fn into_parts(self) -> (DType, ByteBuffer, ArrayRef, Validity) {
179 (self.dtype, self.bytes, self.offsets, self.validity)
180 }
181}
182
183impl ArrayValidityImpl for VarBinArray {
184 fn _is_valid(&self, index: usize) -> VortexResult<bool> {
185 self.validity.is_valid(index)
186 }
187
188 fn _all_valid(&self) -> VortexResult<bool> {
189 self.validity.all_valid()
190 }
191
192 fn _all_invalid(&self) -> VortexResult<bool> {
193 self.validity.all_invalid()
194 }
195
196 fn _validity_mask(&self) -> VortexResult<Mask> {
197 self.validity.to_mask(self.len())
198 }
199}
200
201impl ArrayImpl for VarBinArray {
202 type Encoding = VarBinEncoding;
203
204 fn _len(&self) -> usize {
205 self.offsets().len().saturating_sub(1)
206 }
207
208 fn _dtype(&self) -> &DType {
209 &self.dtype
210 }
211
212 fn _vtable(&self) -> VTableRef {
213 VTableRef::new_ref(&VarBinEncoding)
214 }
215
216 fn _with_children(&self, children: &[ArrayRef]) -> VortexResult<Self> {
217 let new = match children.len() {
218 1 => {
220 let offsets = children[0].clone();
221 Self::try_new(
222 offsets,
223 self.bytes().clone(),
224 self.dtype().clone(),
225 self.validity().clone(),
226 )?
227 }
228 2 => {
230 let offsets = children[0].clone();
231 let validity_array = children[1].clone();
232 Self::try_new(
233 offsets,
234 self.bytes().clone(),
235 self.dtype().clone(),
236 Validity::Array(validity_array),
237 )?
238 }
239 _ => vortex_bail!("unexpected number of new children"),
240 };
241
242 Ok(new)
243 }
244}
245
246impl ArrayStatisticsImpl for VarBinArray {
247 fn _stats_ref(&self) -> StatsSetRef<'_> {
248 self.stats_set.to_ref(self)
249 }
250}
251
252impl From<Vec<&[u8]>> for VarBinArray {
253 fn from(value: Vec<&[u8]>) -> Self {
254 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
255 }
256}
257
258impl From<Vec<Vec<u8>>> for VarBinArray {
259 fn from(value: Vec<Vec<u8>>) -> Self {
260 Self::from_vec(value, DType::Binary(Nullability::NonNullable))
261 }
262}
263
264impl From<Vec<String>> for VarBinArray {
265 fn from(value: Vec<String>) -> Self {
266 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
267 }
268}
269
270impl From<Vec<&str>> for VarBinArray {
271 fn from(value: Vec<&str>) -> Self {
272 Self::from_vec(value, DType::Utf8(Nullability::NonNullable))
273 }
274}
275
276impl<'a> FromIterator<Option<&'a [u8]>> for VarBinArray {
277 fn from_iter<T: IntoIterator<Item = Option<&'a [u8]>>>(iter: T) -> Self {
278 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
279 }
280}
281
282impl FromIterator<Option<Vec<u8>>> for VarBinArray {
283 fn from_iter<T: IntoIterator<Item = Option<Vec<u8>>>>(iter: T) -> Self {
284 Self::from_iter(iter, DType::Binary(Nullability::Nullable))
285 }
286}
287
288impl FromIterator<Option<String>> for VarBinArray {
289 fn from_iter<T: IntoIterator<Item = Option<String>>>(iter: T) -> Self {
290 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
291 }
292}
293
294impl<'a> FromIterator<Option<&'a str>> for VarBinArray {
295 fn from_iter<T: IntoIterator<Item = Option<&'a str>>>(iter: T) -> Self {
296 Self::from_iter(iter, DType::Utf8(Nullability::Nullable))
297 }
298}
299
300pub fn varbin_scalar(value: ByteBuffer, dtype: &DType) -> Scalar {
301 if matches!(dtype, DType::Utf8(_)) {
302 Scalar::try_utf8(value, dtype.nullability())
303 .map_err(|err| vortex_err!("Failed to create scalar from utf8 buffer: {}", err))
304 .vortex_unwrap()
305 } else {
306 Scalar::binary(value, dtype.nullability())
307 }
308}
309
310#[cfg(test)]
311mod test {
312 use rstest::{fixture, rstest};
313 use vortex_buffer::Buffer;
314 use vortex_dtype::{DType, Nullability};
315
316 use crate::ArrayRef;
317 use crate::array::Array;
318 use crate::arrays::primitive::PrimitiveArray;
319 use crate::arrays::varbin::VarBinArray;
320 use crate::compute::{scalar_at, slice};
321 use crate::validity::Validity;
322
323 #[fixture]
324 fn binary_array() -> ArrayRef {
325 let values = Buffer::copy_from("hello worldhello world this is a long string".as_bytes());
326 let offsets = PrimitiveArray::from_iter([0, 11, 44]);
327
328 VarBinArray::try_new(
329 offsets.into_array(),
330 values,
331 DType::Utf8(Nullability::NonNullable),
332 Validity::NonNullable,
333 )
334 .unwrap()
335 .into_array()
336 }
337
338 #[rstest]
339 pub fn test_scalar_at(binary_array: ArrayRef) {
340 assert_eq!(binary_array.len(), 2);
341 assert_eq!(scalar_at(&binary_array, 0).unwrap(), "hello world".into());
342 assert_eq!(
343 scalar_at(&binary_array, 1).unwrap(),
344 "hello world this is a long string".into()
345 )
346 }
347
348 #[rstest]
349 pub fn slice_array(binary_array: ArrayRef) {
350 let binary_arr = slice(&binary_array, 1, 2).unwrap();
351 assert_eq!(
352 scalar_at(&binary_arr, 0).unwrap(),
353 "hello world this is a long string".into()
354 );
355 }
356}