vortex_zstd/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::ops::Range;
7use std::sync::Arc;
8
9use itertools::Itertools as _;
10use prost::Message as _;
11use vortex_array::ArrayBufferVisitor;
12use vortex_array::ArrayChildVisitor;
13use vortex_array::ArrayEq;
14use vortex_array::ArrayHash;
15use vortex_array::ArrayRef;
16use vortex_array::Canonical;
17use vortex_array::IntoArray;
18use vortex_array::Precision;
19use vortex_array::ProstMetadata;
20use vortex_array::ToCanonical;
21use vortex_array::accessor::ArrayAccessor;
22use vortex_array::arrays::ConstantArray;
23use vortex_array::arrays::PrimitiveArray;
24use vortex_array::arrays::VarBinViewArray;
25use vortex_array::buffer::BufferHandle;
26use vortex_array::compute::filter;
27use vortex_array::serde::ArrayChildren;
28use vortex_array::stats::ArrayStats;
29use vortex_array::stats::StatsSetRef;
30use vortex_array::validity::Validity;
31use vortex_array::vtable;
32use vortex_array::vtable::ArrayId;
33use vortex_array::vtable::ArrayVTable;
34use vortex_array::vtable::ArrayVTableExt;
35use vortex_array::vtable::BaseArrayVTable;
36use vortex_array::vtable::CanonicalVTable;
37use vortex_array::vtable::EncodeVTable;
38use vortex_array::vtable::NotSupported;
39use vortex_array::vtable::OperationsVTable;
40use vortex_array::vtable::VTable;
41use vortex_array::vtable::ValidityHelper;
42use vortex_array::vtable::ValiditySliceHelper;
43use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
44use vortex_array::vtable::VisitorVTable;
45use vortex_buffer::Alignment;
46use vortex_buffer::Buffer;
47use vortex_buffer::BufferMut;
48use vortex_buffer::ByteBuffer;
49use vortex_buffer::ByteBufferMut;
50use vortex_dtype::DType;
51use vortex_error::VortexError;
52use vortex_error::VortexExpect;
53use vortex_error::VortexResult;
54use vortex_error::vortex_bail;
55use vortex_error::vortex_ensure;
56use vortex_error::vortex_err;
57use vortex_error::vortex_panic;
58use vortex_mask::AllOr;
59use vortex_scalar::Scalar;
60use vortex_vector::binaryview::BinaryView;
61
62use crate::ZstdFrameMetadata;
63use crate::ZstdMetadata;
64
65// Zstd doesn't support training dictionaries on very few samples.
66const MIN_SAMPLES_FOR_DICTIONARY: usize = 8;
67type ViewLen = u32;
68
69// Overall approach here:
70// Zstd can be used on the whole array (values_per_frame = 0), resulting in a single Zstd
71// frame, or it can be used with a dictionary (values_per_frame < # values), resulting in
72// multiple Zstd frames sharing a common dictionary. This latter case is helpful if you
73// want somewhat faster access to slices or individual rows, allowing us to only
74// decompress the necessary frames.
75
76// Visually, during decompression, we have an interval of frames we're
77// decompressing and a tighter interval of the slice we actually care about.
78// |=============values (all valid elements)==============|
79// |<-skipped_uncompressed->|----decompressed-------------|
80//                              |------slice-------|
81//                              ^                  ^
82// |<-slice_uncompressed_start->|                  |
83// |<------------slice_uncompressed_stop---------->|
84// We then insert these values to the correct position using a primitive array
85// constructor.
86
87vtable!(Zstd);
88
89impl VTable for ZstdVTable {
90    type Array = ZstdArray;
91
92    type Metadata = ProstMetadata<ZstdMetadata>;
93
94    type ArrayVTable = Self;
95    type CanonicalVTable = Self;
96    type OperationsVTable = Self;
97    type ValidityVTable = ValidityVTableFromValiditySliceHelper;
98    type VisitorVTable = Self;
99    type ComputeVTable = NotSupported;
100    type EncodeVTable = Self;
101
102    fn id(&self) -> ArrayId {
103        ArrayId::new_ref("vortex.zstd")
104    }
105
106    fn encoding(_array: &Self::Array) -> ArrayVTable {
107        ZstdVTable.as_vtable()
108    }
109
110    fn metadata(array: &ZstdArray) -> VortexResult<Self::Metadata> {
111        Ok(ProstMetadata(array.metadata.clone()))
112    }
113
114    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
115        Ok(Some(metadata.0.encode_to_vec()))
116    }
117
118    fn deserialize(buffer: &[u8]) -> VortexResult<Self::Metadata> {
119        Ok(ProstMetadata(ZstdMetadata::decode(buffer)?))
120    }
121
122    fn build(
123        &self,
124        dtype: &DType,
125        len: usize,
126        metadata: &Self::Metadata,
127        buffers: &[BufferHandle],
128        children: &dyn ArrayChildren,
129    ) -> VortexResult<ZstdArray> {
130        let validity = if children.is_empty() {
131            Validity::from(dtype.nullability())
132        } else if children.len() == 1 {
133            let validity = children.get(0, &Validity::DTYPE, len)?;
134            Validity::Array(validity)
135        } else {
136            vortex_bail!("ZstdArray expected 0 or 1 child, got {}", children.len());
137        };
138
139        let (dictionary_buffer, compressed_buffers) = if metadata.0.dictionary_size == 0 {
140            // no dictionary
141            (
142                None,
143                buffers
144                    .iter()
145                    .map(|b| b.clone().try_to_bytes())
146                    .collect::<VortexResult<Vec<_>>>()?,
147            )
148        } else {
149            // with dictionary
150            (
151                Some(buffers[0].clone().try_to_bytes()?),
152                buffers[1..]
153                    .iter()
154                    .map(|b| b.clone().try_to_bytes())
155                    .collect::<VortexResult<Vec<_>>>()?,
156            )
157        };
158
159        Ok(ZstdArray::new(
160            dictionary_buffer,
161            compressed_buffers,
162            dtype.clone(),
163            metadata.0.clone(),
164            len,
165            validity,
166        ))
167    }
168
169    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
170        vortex_ensure!(
171            children.len() <= 1,
172            "ZstdArray expects at most 1 child (validity), got {}",
173            children.len()
174        );
175
176        array.unsliced_validity = if children.is_empty() {
177            Validity::from(array.dtype.nullability())
178        } else {
179            Validity::Array(children.into_iter().next().vortex_expect("checked"))
180        };
181
182        Ok(())
183    }
184}
185
186#[derive(Debug)]
187pub struct ZstdVTable;
188
189#[derive(Clone, Debug)]
190pub struct ZstdArray {
191    pub(crate) dictionary: Option<ByteBuffer>,
192    pub(crate) frames: Vec<ByteBuffer>,
193    pub(crate) metadata: ZstdMetadata,
194    dtype: DType,
195    pub(crate) unsliced_validity: Validity,
196    unsliced_n_rows: usize,
197    stats_set: ArrayStats,
198    slice_start: usize,
199    slice_stop: usize,
200}
201
202struct Frames {
203    dictionary: Option<ByteBuffer>,
204    frames: Vec<ByteBuffer>,
205    frame_metas: Vec<ZstdFrameMetadata>,
206}
207
208fn choose_max_dict_size(uncompressed_size: usize) -> usize {
209    // following recommendations from
210    // https://github.com/facebook/zstd/blob/v1.5.5/lib/zdict.h#L190
211    // that is, 1/100 the data size, up to 100kB.
212    // It appears that zstd can't train dictionaries with <256 bytes.
213    (uncompressed_size / 100).clamp(256, 100 * 1024)
214}
215
216fn collect_valid_primitive(parray: &PrimitiveArray) -> VortexResult<PrimitiveArray> {
217    let mask = parray.validity_mask();
218    Ok(filter(&parray.to_array(), &mask)?.to_primitive())
219}
220
221fn collect_valid_vbv(vbv: &VarBinViewArray) -> VortexResult<(ByteBuffer, Vec<usize>)> {
222    let mask = vbv.validity_mask();
223    let buffer_and_value_byte_indices = match mask.bit_buffer() {
224        AllOr::None => (Buffer::empty(), Vec::new()),
225        _ => {
226            let mut buffer = BufferMut::with_capacity(
227                usize::try_from(vbv.nbytes()).vortex_expect("must fit into buffer")
228                    + mask.true_count() * size_of::<ViewLen>(),
229            );
230            let mut value_byte_indices = Vec::new();
231            vbv.with_iterator(|iterator| {
232                // by flattening, we should omit nulls
233                for value in iterator.flatten() {
234                    value_byte_indices.push(buffer.len());
235                    // here's where we write the string lengths
236                    buffer
237                        .extend_trusted(ViewLen::try_from(value.len())?.to_le_bytes().into_iter());
238                    buffer.extend_from_slice(value);
239                }
240                Ok::<_, VortexError>(())
241            })?;
242            (buffer.freeze(), value_byte_indices)
243        }
244    };
245    Ok(buffer_and_value_byte_indices)
246}
247
248fn reconstruct_views(buffer: ByteBuffer) -> Buffer<BinaryView> {
249    let mut res = BufferMut::<BinaryView>::empty();
250    let mut offset = 0;
251    while offset < buffer.len() {
252        let str_len = ViewLen::from_le_bytes(
253            buffer
254                .get(offset..offset + size_of::<ViewLen>())
255                .vortex_expect("corrupted zstd length")
256                .try_into()
257                .vortex_expect("must fit ViewLen size"),
258        ) as usize;
259        offset += size_of::<ViewLen>();
260        let value = &buffer[offset..offset + str_len];
261        res.push(BinaryView::make_view(
262            value,
263            0,
264            u32::try_from(offset).vortex_expect("offset must fit in u32"),
265        ));
266        offset += str_len;
267    }
268    res.freeze()
269}
270
271impl ZstdArray {
272    pub fn new(
273        dictionary: Option<ByteBuffer>,
274        frames: Vec<ByteBuffer>,
275        dtype: DType,
276        metadata: ZstdMetadata,
277        n_rows: usize,
278        validity: Validity,
279    ) -> Self {
280        Self {
281            dictionary,
282            frames,
283            metadata,
284            dtype,
285            unsliced_validity: validity,
286            unsliced_n_rows: n_rows,
287            stats_set: Default::default(),
288            slice_start: 0,
289            slice_stop: n_rows,
290        }
291    }
292
293    fn compress_values(
294        value_bytes: &ByteBuffer,
295        frame_byte_starts: &[usize],
296        level: i32,
297        values_per_frame: usize,
298        n_values: usize,
299    ) -> VortexResult<Frames> {
300        let n_frames = frame_byte_starts.len();
301
302        // Would-be sample sizes if we end up applying zstd dictionary
303        let mut sample_sizes = Vec::with_capacity(n_frames);
304        for i in 0..n_frames {
305            let frame_byte_end = frame_byte_starts
306                .get(i + 1)
307                .copied()
308                .unwrap_or(value_bytes.len());
309            sample_sizes.push(frame_byte_end - frame_byte_starts[i]);
310        }
311        debug_assert_eq!(sample_sizes.iter().sum::<usize>(), value_bytes.len());
312
313        let (dictionary, mut compressor) = if sample_sizes.len() < MIN_SAMPLES_FOR_DICTIONARY {
314            // no dictionary
315            (None, zstd::bulk::Compressor::new(level)?)
316        } else {
317            // with dictionary
318            let max_dict_size = choose_max_dict_size(value_bytes.len());
319            let dict = zstd::dict::from_continuous(value_bytes, &sample_sizes, max_dict_size)
320                .map_err(|err| VortexError::from(err).with_context("while training dictionary"))?;
321
322            let compressor = zstd::bulk::Compressor::with_dictionary(level, &dict)?;
323            (Some(ByteBuffer::from(dict)), compressor)
324        };
325
326        let mut frame_metas = vec![];
327        let mut frames = vec![];
328        for i in 0..n_frames {
329            let frame_byte_end = frame_byte_starts
330                .get(i + 1)
331                .copied()
332                .unwrap_or(value_bytes.len());
333
334            let uncompressed = &value_bytes.slice(frame_byte_starts[i]..frame_byte_end);
335            let compressed = compressor
336                .compress(uncompressed)
337                .map_err(|err| VortexError::from(err).with_context("while compressing"))?;
338            frame_metas.push(ZstdFrameMetadata {
339                uncompressed_size: uncompressed.len() as u64,
340                n_values: values_per_frame.min(n_values - i * values_per_frame) as u64,
341            });
342            frames.push(ByteBuffer::from(compressed));
343        }
344
345        Ok(Frames {
346            dictionary,
347            frames,
348            frame_metas,
349        })
350    }
351
352    pub fn from_primitive(
353        parray: &PrimitiveArray,
354        level: i32,
355        values_per_frame: usize,
356    ) -> VortexResult<Self> {
357        let dtype = parray.dtype().clone();
358        let byte_width = parray.ptype().byte_width();
359
360        // We compress only the valid elements.
361        let values = collect_valid_primitive(parray)?;
362        let n_values = values.len();
363        let values_per_frame = if values_per_frame > 0 {
364            values_per_frame
365        } else {
366            n_values
367        };
368
369        let value_bytes = values.byte_buffer();
370        // Align frames to buffer alignment. This is necessary for overaligned buffers.
371        let alignment = *value_bytes.alignment();
372        let step_width = (values_per_frame * byte_width).div_ceil(alignment) * alignment;
373
374        let frame_byte_starts = (0..n_values * byte_width)
375            .step_by(step_width)
376            .collect::<Vec<_>>();
377        let Frames {
378            dictionary,
379            frames,
380            frame_metas,
381        } = Self::compress_values(
382            value_bytes,
383            &frame_byte_starts,
384            level,
385            values_per_frame,
386            n_values,
387        )?;
388
389        let metadata = ZstdMetadata {
390            dictionary_size: dictionary
391                .as_ref()
392                .map_or(0, |dict| dict.len())
393                .try_into()?,
394            frames: frame_metas,
395        };
396
397        Ok(ZstdArray::new(
398            dictionary,
399            frames,
400            dtype,
401            metadata,
402            parray.len(),
403            parray.validity().clone(),
404        ))
405    }
406
407    pub fn from_var_bin_view(
408        vbv: &VarBinViewArray,
409        level: i32,
410        values_per_frame: usize,
411    ) -> VortexResult<Self> {
412        // Approach for strings: we prefix each string with its length as a u32.
413        // This is the same as what Parquet does. In some cases it may be better
414        // to separate the binary data and lengths as two separate streams, but
415        // this approach is simpler and can be best in cases when there is
416        // mutual information between strings and their lengths.
417        let dtype = vbv.dtype().clone();
418
419        // We compress only the valid elements.
420        let (value_bytes, value_byte_indices) = collect_valid_vbv(vbv)?;
421        let n_values = value_byte_indices.len();
422        let values_per_frame = if values_per_frame > 0 {
423            values_per_frame
424        } else {
425            n_values
426        };
427
428        let frame_byte_starts = (0..n_values)
429            .step_by(values_per_frame)
430            .map(|i| value_byte_indices[i])
431            .collect::<Vec<_>>();
432        let Frames {
433            dictionary,
434            frames,
435            frame_metas,
436        } = Self::compress_values(
437            &value_bytes,
438            &frame_byte_starts,
439            level,
440            values_per_frame,
441            n_values,
442        )?;
443
444        let metadata = ZstdMetadata {
445            dictionary_size: dictionary
446                .as_ref()
447                .map_or(0, |dict| dict.len())
448                .try_into()?,
449            frames: frame_metas,
450        };
451        Ok(ZstdArray::new(
452            dictionary,
453            frames,
454            dtype,
455            metadata,
456            vbv.len(),
457            vbv.validity().clone(),
458        ))
459    }
460
461    pub fn from_canonical(
462        canonical: &Canonical,
463        level: i32,
464        values_per_frame: usize,
465    ) -> VortexResult<Option<Self>> {
466        match canonical {
467            Canonical::Primitive(parray) => Ok(Some(ZstdArray::from_primitive(
468                parray,
469                level,
470                values_per_frame,
471            )?)),
472            Canonical::VarBinView(vbv) => Ok(Some(ZstdArray::from_var_bin_view(
473                vbv,
474                level,
475                values_per_frame,
476            )?)),
477            _ => Ok(None),
478        }
479    }
480
481    pub fn from_array(array: ArrayRef, level: i32, values_per_frame: usize) -> VortexResult<Self> {
482        Self::from_canonical(&array.to_canonical(), level, values_per_frame)?
483            .ok_or_else(|| vortex_err!("Zstd can only encode Primitive and VarBinView arrays"))
484    }
485
486    fn byte_width(&self) -> usize {
487        if self.dtype.is_primitive() {
488            self.dtype.as_ptype().byte_width()
489        } else {
490            1
491        }
492    }
493
494    pub fn decompress(&self) -> ArrayRef {
495        // To start, we figure out which frames we need to decompress, and with
496        // what row offset into the first such frame.
497        let byte_width = self.byte_width();
498        let slice_n_rows = self.slice_stop - self.slice_start;
499        let slice_value_indices = self
500            .unsliced_validity
501            .to_mask(self.unsliced_n_rows)
502            .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
503
504        let slice_value_idx_start = slice_value_indices[0];
505        let slice_value_idx_stop = slice_value_indices[1];
506
507        let mut frames_to_decompress = vec![];
508        let mut value_idx_start = 0;
509        let mut uncompressed_size_to_decompress = 0;
510        let mut n_skipped_values = 0;
511        for (frame, frame_meta) in self.frames.iter().zip(&self.metadata.frames) {
512            if value_idx_start >= slice_value_idx_stop {
513                break;
514            }
515
516            let frame_uncompressed_size = usize::try_from(frame_meta.uncompressed_size)
517                .vortex_expect("Uncompressed size must fit in usize");
518            let frame_n_values = if frame_meta.n_values == 0 {
519                // possibly older primitive-only metadata that just didn't store this
520                frame_uncompressed_size / byte_width
521            } else {
522                usize::try_from(frame_meta.n_values).vortex_expect("frame size must fit usize")
523            };
524
525            let value_idx_stop = value_idx_start + frame_n_values;
526            if value_idx_stop > slice_value_idx_start {
527                // we need this frame
528                frames_to_decompress.push(frame);
529                uncompressed_size_to_decompress += frame_uncompressed_size;
530            } else {
531                n_skipped_values += frame_n_values;
532            }
533            value_idx_start = value_idx_stop;
534        }
535
536        // then we actually decompress those frames
537        let mut decompressor = if let Some(dictionary) = &self.dictionary {
538            zstd::bulk::Decompressor::with_dictionary(dictionary)
539        } else {
540            zstd::bulk::Decompressor::new()
541        }
542        .vortex_expect("Decompressor encountered io error");
543        let mut decompressed = ByteBufferMut::with_capacity_aligned(
544            uncompressed_size_to_decompress,
545            Alignment::new(byte_width),
546        );
547        unsafe {
548            // safety: we immediately fill all bytes in the following loop,
549            // assuming our metadata's uncompressed size is correct
550            decompressed.set_len(uncompressed_size_to_decompress);
551        }
552        let mut uncompressed_start = 0;
553        for frame in frames_to_decompress {
554            let uncompressed_written = decompressor
555                .decompress_to_buffer(frame.as_slice(), &mut decompressed[uncompressed_start..])
556                .vortex_expect("error while decompressing zstd array");
557            uncompressed_start += uncompressed_written;
558        }
559        if uncompressed_start != uncompressed_size_to_decompress {
560            vortex_panic!(
561                "Zstd metadata or frames were corrupt; expected {} bytes but decompressed {}",
562                uncompressed_size_to_decompress,
563                uncompressed_start
564            );
565        }
566
567        let decompressed = decompressed.freeze();
568        // Last, we slice the exact values requested out of the decompressed data.
569        let mut slice_validity = self
570            .unsliced_validity
571            .slice(self.slice_start..self.slice_stop);
572
573        // NOTE: this block handles setting the output type when the validity and DType disagree.
574        //
575        // ZSTD is a compact block compressor, meaning that null values are not stored inline in
576        // the data frames. A ZSTD Array that was initialized must always hold onto its full
577        // validity bitmap, even if sliced to only include non-null values.
578        //
579        // We ensure that the validity of the decompressed array ALWAYS matches the validity
580        // implied by the DType.
581        if !self.dtype().is_nullable() && slice_validity != Validity::NonNullable {
582            assert!(
583                slice_validity.all_valid(slice_n_rows),
584                "ZSTD array expects to be non-nullable but there are nulls after decompression"
585            );
586
587            slice_validity = Validity::NonNullable;
588        } else if self.dtype.is_nullable() && slice_validity == Validity::NonNullable {
589            slice_validity = Validity::AllValid;
590        }
591        //
592        // END OF IMPORTANT BLOCK
593        //
594
595        match &self.dtype {
596            DType::Primitive(..) => {
597                let slice_values_buffer = decompressed.slice(
598                    (slice_value_idx_start - n_skipped_values) * byte_width
599                        ..(slice_value_idx_stop - n_skipped_values) * byte_width,
600                );
601                let primitive = PrimitiveArray::from_values_byte_buffer(
602                    slice_values_buffer,
603                    self.dtype.as_ptype(),
604                    slice_validity,
605                    slice_n_rows,
606                );
607
608                primitive.into_array()
609            }
610            DType::Binary(_) | DType::Utf8(_) => {
611                match slice_validity.to_mask(slice_n_rows).indices() {
612                    AllOr::All => {
613                        // the decompressed buffer is a bunch of interleaved u32 lengths
614                        // and strings of those lengths, we need to reconstruct the
615                        // views into those strings by passing through the buffer.
616                        let valid_views = reconstruct_views(decompressed.clone()).slice(
617                            slice_value_idx_start - n_skipped_values
618                                ..slice_value_idx_stop - n_skipped_values,
619                        );
620
621                        // SAFETY: we properly construct the views inside `reconstruct_views`
622                        unsafe {
623                            VarBinViewArray::new_unchecked(
624                                valid_views,
625                                Arc::from([decompressed]),
626                                self.dtype.clone(),
627                                slice_validity,
628                            )
629                        }
630                        .into_array()
631                    }
632                    AllOr::None => {
633                        ConstantArray::new(Scalar::null(self.dtype.clone()), slice_n_rows)
634                            .into_array()
635                    }
636                    AllOr::Some(valid_indices) => {
637                        // the decompressed buffer is a bunch of interleaved u32 lengths
638                        // and strings of those lengths, we need to reconstruct the
639                        // views into those strings by passing through the buffer.
640                        let valid_views = reconstruct_views(decompressed.clone()).slice(
641                            slice_value_idx_start - n_skipped_values
642                                ..slice_value_idx_stop - n_skipped_values,
643                        );
644
645                        let mut views = BufferMut::<BinaryView>::zeroed(slice_n_rows);
646                        for (view, index) in valid_views.into_iter().zip_eq(valid_indices) {
647                            views[*index] = view
648                        }
649
650                        // SAFETY: we properly construct the views inside `reconstruct_views`
651                        unsafe {
652                            VarBinViewArray::new_unchecked(
653                                views.freeze(),
654                                Arc::from([decompressed]),
655                                self.dtype.clone(),
656                                slice_validity,
657                            )
658                        }
659                        .into_array()
660                    }
661                }
662            }
663            _ => vortex_panic!("Unsupported dtype for Zstd array: {}", self.dtype),
664        }
665    }
666
667    pub(crate) fn _slice(&self, start: usize, stop: usize) -> ZstdArray {
668        let new_start = self.slice_start + start;
669        let new_stop = self.slice_start + stop;
670
671        assert!(
672            new_start <= self.slice_stop,
673            "new slice start {new_start} exceeds end {}",
674            self.slice_stop
675        );
676
677        assert!(
678            new_stop <= self.slice_stop,
679            "new slice stop {new_stop} exceeds end {}",
680            self.slice_stop
681        );
682
683        ZstdArray {
684            slice_start: self.slice_start + start,
685            slice_stop: self.slice_start + stop,
686            stats_set: Default::default(),
687            ..self.clone()
688        }
689    }
690
691    pub(crate) fn dtype(&self) -> &DType {
692        &self.dtype
693    }
694
695    pub(crate) fn slice_start(&self) -> usize {
696        self.slice_start
697    }
698
699    pub(crate) fn slice_stop(&self) -> usize {
700        self.slice_stop
701    }
702
703    pub(crate) fn unsliced_n_rows(&self) -> usize {
704        self.unsliced_n_rows
705    }
706}
707
708impl ValiditySliceHelper for ZstdArray {
709    fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
710        (&self.unsliced_validity, self.slice_start, self.slice_stop)
711    }
712}
713
714impl BaseArrayVTable<ZstdVTable> for ZstdVTable {
715    fn len(array: &ZstdArray) -> usize {
716        array.slice_stop - array.slice_start
717    }
718
719    fn dtype(array: &ZstdArray) -> &DType {
720        &array.dtype
721    }
722
723    fn stats(array: &ZstdArray) -> StatsSetRef<'_> {
724        array.stats_set.to_ref(array.as_ref())
725    }
726
727    fn array_hash<H: std::hash::Hasher>(array: &ZstdArray, state: &mut H, precision: Precision) {
728        match &array.dictionary {
729            Some(dict) => {
730                true.hash(state);
731                dict.array_hash(state, precision);
732            }
733            None => {
734                false.hash(state);
735            }
736        }
737        for frame in &array.frames {
738            frame.array_hash(state, precision);
739        }
740        array.dtype.hash(state);
741        array.unsliced_validity.array_hash(state, precision);
742        array.unsliced_n_rows.hash(state);
743        array.slice_start.hash(state);
744        array.slice_stop.hash(state);
745    }
746
747    fn array_eq(array: &ZstdArray, other: &ZstdArray, precision: Precision) -> bool {
748        if !match (&array.dictionary, &other.dictionary) {
749            (Some(d1), Some(d2)) => d1.array_eq(d2, precision),
750            (None, None) => true,
751            _ => false,
752        } {
753            return false;
754        }
755        if array.frames.len() != other.frames.len() {
756            return false;
757        }
758        for (a, b) in array.frames.iter().zip(&other.frames) {
759            if !a.array_eq(b, precision) {
760                return false;
761            }
762        }
763        array.dtype == other.dtype
764            && array
765                .unsliced_validity
766                .array_eq(&other.unsliced_validity, precision)
767            && array.unsliced_n_rows == other.unsliced_n_rows
768            && array.slice_start == other.slice_start
769            && array.slice_stop == other.slice_stop
770    }
771}
772
773impl CanonicalVTable<ZstdVTable> for ZstdVTable {
774    fn canonicalize(array: &ZstdArray) -> Canonical {
775        array.decompress().to_canonical()
776    }
777}
778
779impl OperationsVTable<ZstdVTable> for ZstdVTable {
780    fn slice(array: &ZstdArray, range: Range<usize>) -> ArrayRef {
781        array._slice(range.start, range.end).into_array()
782    }
783
784    fn scalar_at(array: &ZstdArray, index: usize) -> Scalar {
785        array._slice(index, index + 1).decompress().scalar_at(0)
786    }
787}
788
789impl EncodeVTable<ZstdVTable> for ZstdVTable {
790    fn encode(
791        _vtable: &ZstdVTable,
792        canonical: &Canonical,
793        _like: Option<&ZstdArray>,
794    ) -> VortexResult<Option<ZstdArray>> {
795        ZstdArray::from_canonical(canonical, 3, 0)
796    }
797}
798
799impl VisitorVTable<ZstdVTable> for ZstdVTable {
800    fn visit_buffers(array: &ZstdArray, visitor: &mut dyn ArrayBufferVisitor) {
801        if let Some(buffer) = &array.dictionary {
802            visitor.visit_buffer(buffer);
803        }
804        for buffer in &array.frames {
805            visitor.visit_buffer(buffer);
806        }
807    }
808
809    fn visit_children(array: &ZstdArray, visitor: &mut dyn ArrayChildVisitor) {
810        visitor.visit_validity(&array.unsliced_validity, array.unsliced_n_rows());
811    }
812}