vortex_array/arrays/chunked/vtable/
mod.rs1use itertools::Itertools;
5use vortex_dtype::DType;
6use vortex_dtype::Nullability;
7use vortex_dtype::PType;
8use vortex_error::VortexResult;
9use vortex_error::vortex_bail;
10use vortex_error::vortex_ensure;
11use vortex_error::vortex_err;
12use vortex_session::VortexSession;
13
14use crate::ArrayRef;
15use crate::Canonical;
16use crate::EmptyMetadata;
17use crate::ExecutionCtx;
18use crate::IntoArray;
19use crate::ToCanonical;
20use crate::arrays::ChunkedArray;
21use crate::arrays::PrimitiveArray;
22use crate::arrays::chunked::compute::kernel::PARENT_KERNELS;
23use crate::arrays::chunked::compute::rules::PARENT_RULES;
24use crate::arrays::chunked::vtable::canonical::_canonicalize;
25use crate::buffer::BufferHandle;
26use crate::builders::ArrayBuilder;
27use crate::serde::ArrayChildren;
28use crate::validity::Validity;
29use crate::vtable;
30use crate::vtable::ArrayId;
31use crate::vtable::VTable;
32
33mod array;
34mod canonical;
35mod operations;
36mod validity;
37mod visitor;
38
39vtable!(Chunked);
40
41#[derive(Debug)]
42pub struct ChunkedVTable;
43
44impl ChunkedVTable {
45 pub const ID: ArrayId = ArrayId::new_ref("vortex.chunked");
46}
47
48impl VTable for ChunkedVTable {
49 type Array = ChunkedArray;
50
51 type Metadata = EmptyMetadata;
52
53 type ArrayVTable = Self;
54 type OperationsVTable = Self;
55 type ValidityVTable = Self;
56 type VisitorVTable = Self;
57
58 fn id(_array: &Self::Array) -> ArrayId {
59 Self::ID
60 }
61
62 fn metadata(_array: &ChunkedArray) -> VortexResult<Self::Metadata> {
63 Ok(EmptyMetadata)
64 }
65
66 fn serialize(_metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
67 Ok(Some(vec![]))
68 }
69
70 fn deserialize(
71 _bytes: &[u8],
72 _dtype: &DType,
73 _len: usize,
74 _buffers: &[BufferHandle],
75 _session: &VortexSession,
76 ) -> VortexResult<Self::Metadata> {
77 Ok(EmptyMetadata)
78 }
79
80 fn build(
81 dtype: &DType,
82 _len: usize,
83 _metadata: &Self::Metadata,
84 _buffers: &[BufferHandle],
85 children: &dyn ArrayChildren,
86 ) -> VortexResult<ChunkedArray> {
87 if children.is_empty() {
88 vortex_bail!("Chunked array needs at least one child");
89 }
90
91 let nchunks = children.len() - 1;
92
93 let chunk_offsets_array = children
95 .get(
96 0,
97 &DType::Primitive(PType::U64, Nullability::NonNullable),
98 nchunks + 1,
100 )?
101 .to_primitive();
102
103 let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
104
105 let chunks = chunk_offsets_buf
107 .iter()
108 .tuple_windows()
109 .enumerate()
110 .map(|(idx, (start, end))| {
111 let chunk_len = usize::try_from(end - start)
112 .map_err(|_| vortex_err!("chunk_len {} exceeds usize range", end - start))?;
113 children.get(idx + 1, dtype, chunk_len)
114 })
115 .try_collect()?;
116
117 let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
118
119 let total_len = chunk_offsets_buf
120 .last()
121 .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
122 let len = usize::try_from(*total_len)
123 .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
124
125 Ok(ChunkedArray {
127 dtype: dtype.clone(),
128 len,
129 chunk_offsets,
130 chunks,
131 stats_set: Default::default(),
132 })
133 }
134
135 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
136 vortex_ensure!(
138 !children.is_empty(),
139 "Chunked array needs at least one child"
140 );
141
142 let nchunks = children.len() - 1;
143 let chunk_offsets_array = children[0].to_primitive();
144 let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
145
146 vortex_ensure!(
147 chunk_offsets_buf.len() == nchunks + 1,
148 "Expected {} chunk offsets, found {}",
149 nchunks + 1,
150 chunk_offsets_buf.len()
151 );
152
153 let chunks = children.into_iter().skip(1).collect();
154 array.chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
155 array.chunks = chunks;
156
157 let total_len = chunk_offsets_buf
158 .last()
159 .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
160 array.len = usize::try_from(*total_len)
161 .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
162
163 Ok(())
164 }
165
166 fn append_to_builder(
167 array: &ChunkedArray,
168 builder: &mut dyn ArrayBuilder,
169 ctx: &mut ExecutionCtx,
170 ) -> VortexResult<()> {
171 for chunk in array.chunks() {
172 chunk.append_to_builder(builder, ctx)?;
173 }
174 Ok(())
175 }
176
177 fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
178 Ok(_canonicalize(array, ctx)?.into_array())
179 }
180
181 fn reduce(array: &Self::Array) -> VortexResult<Option<ArrayRef>> {
182 Ok(match array.chunks.len() {
183 0 => Some(Canonical::empty(array.dtype()).into_array()),
184 1 => Some(array.chunks[0].clone()),
185 _ => None,
186 })
187 }
188
189 fn reduce_parent(
190 array: &Self::Array,
191 parent: &ArrayRef,
192 child_idx: usize,
193 ) -> VortexResult<Option<ArrayRef>> {
194 PARENT_RULES.evaluate(array, parent, child_idx)
195 }
196
197 fn execute_parent(
198 array: &Self::Array,
199 parent: &ArrayRef,
200 child_idx: usize,
201 ctx: &mut ExecutionCtx,
202 ) -> VortexResult<Option<ArrayRef>> {
203 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
204 }
205}