1use std::cmp;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::sync::Arc;
8
9use pco::ChunkConfig;
10use pco::PagingSpec;
11use pco::data_types::Number;
12use pco::data_types::NumberType;
13use pco::errors::PcoError;
14use pco::match_number_enum;
15use pco::wrapped::ChunkDecompressor;
16use pco::wrapped::FileCompressor;
17use pco::wrapped::FileDecompressor;
18use prost::Message;
19use vortex_array::ArrayEq;
20use vortex_array::ArrayHash;
21use vortex_array::ArrayRef;
22use vortex_array::DynArray;
23use vortex_array::ExecutionCtx;
24use vortex_array::ExecutionResult;
25use vortex_array::IntoArray;
26use vortex_array::LEGACY_SESSION;
27use vortex_array::Precision;
28use vortex_array::ProstMetadata;
29use vortex_array::ToCanonical;
30use vortex_array::VortexSessionExecute;
31use vortex_array::arrays::Primitive;
32use vortex_array::arrays::PrimitiveArray;
33use vortex_array::buffer::BufferHandle;
34use vortex_array::dtype::DType;
35use vortex_array::dtype::PType;
36use vortex_array::dtype::half;
37use vortex_array::scalar::Scalar;
38use vortex_array::serde::ArrayChildren;
39use vortex_array::stats::ArrayStats;
40use vortex_array::stats::StatsSetRef;
41use vortex_array::validity::Validity;
42use vortex_array::vtable;
43use vortex_array::vtable::ArrayId;
44use vortex_array::vtable::OperationsVTable;
45use vortex_array::vtable::VTable;
46use vortex_array::vtable::ValidityHelper;
47use vortex_array::vtable::ValiditySliceHelper;
48use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
49use vortex_array::vtable::validity_nchildren;
50use vortex_array::vtable::validity_to_child;
51use vortex_buffer::BufferMut;
52use vortex_buffer::ByteBuffer;
53use vortex_buffer::ByteBufferMut;
54use vortex_error::VortexError;
55use vortex_error::VortexExpect as _;
56use vortex_error::VortexResult;
57use vortex_error::vortex_bail;
58use vortex_error::vortex_ensure;
59use vortex_error::vortex_err;
60use vortex_error::vortex_panic;
61use vortex_session::VortexSession;
62
63use crate::PcoChunkInfo;
64use crate::PcoMetadata;
65use crate::PcoPageInfo;
66
67const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
86
87vtable!(Pco);
88
89impl VTable for Pco {
90 type Array = PcoArray;
91
92 type Metadata = ProstMetadata<PcoMetadata>;
93 type OperationsVTable = Self;
94 type ValidityVTable = ValidityVTableFromValiditySliceHelper;
95
96 fn vtable(_array: &Self::Array) -> &Self {
97 &Pco
98 }
99
100 fn id(&self) -> ArrayId {
101 Self::ID
102 }
103
104 fn len(array: &PcoArray) -> usize {
105 array.slice_stop - array.slice_start
106 }
107
108 fn dtype(array: &PcoArray) -> &DType {
109 &array.dtype
110 }
111
112 fn stats(array: &PcoArray) -> StatsSetRef<'_> {
113 array.stats_set.to_ref(array.as_ref())
114 }
115
116 fn array_hash<H: std::hash::Hasher>(array: &PcoArray, state: &mut H, precision: Precision) {
117 array.dtype.hash(state);
118 array.unsliced_validity.array_hash(state, precision);
119 array.unsliced_n_rows.hash(state);
120 array.slice_start.hash(state);
121 array.slice_stop.hash(state);
122 for chunk_meta in &array.chunk_metas {
124 chunk_meta.array_hash(state, precision);
125 }
126 for page in &array.pages {
127 page.array_hash(state, precision);
128 }
129 }
130
131 fn array_eq(array: &PcoArray, other: &PcoArray, precision: Precision) -> bool {
132 if array.dtype != other.dtype
133 || !array
134 .unsliced_validity
135 .array_eq(&other.unsliced_validity, precision)
136 || array.unsliced_n_rows != other.unsliced_n_rows
137 || array.slice_start != other.slice_start
138 || array.slice_stop != other.slice_stop
139 || array.chunk_metas.len() != other.chunk_metas.len()
140 || array.pages.len() != other.pages.len()
141 {
142 return false;
143 }
144 for (a, b) in array.chunk_metas.iter().zip(&other.chunk_metas) {
145 if !a.array_eq(b, precision) {
146 return false;
147 }
148 }
149 for (a, b) in array.pages.iter().zip(&other.pages) {
150 if !a.array_eq(b, precision) {
151 return false;
152 }
153 }
154 true
155 }
156
157 fn nbuffers(array: &PcoArray) -> usize {
158 array.chunk_metas.len() + array.pages.len()
159 }
160
161 fn buffer(array: &PcoArray, idx: usize) -> BufferHandle {
162 if idx < array.chunk_metas.len() {
163 BufferHandle::new_host(array.chunk_metas[idx].clone())
164 } else {
165 let page_idx = idx - array.chunk_metas.len();
166 BufferHandle::new_host(array.pages[page_idx].clone())
167 }
168 }
169
170 fn buffer_name(array: &PcoArray, idx: usize) -> Option<String> {
171 if idx < array.chunk_metas.len() {
172 Some(format!("chunk_meta_{idx}"))
173 } else {
174 Some(format!("page_{}", idx - array.chunk_metas.len()))
175 }
176 }
177
178 fn nchildren(array: &PcoArray) -> usize {
179 validity_nchildren(&array.unsliced_validity)
180 }
181
182 fn child(array: &PcoArray, idx: usize) -> ArrayRef {
183 validity_to_child(&array.unsliced_validity, array.unsliced_n_rows)
184 .unwrap_or_else(|| vortex_panic!("PcoArray child index {idx} out of bounds"))
185 }
186
187 fn child_name(_array: &PcoArray, idx: usize) -> String {
188 match idx {
189 0 => "validity".to_string(),
190 _ => vortex_panic!("PcoArray child_name index {idx} out of bounds"),
191 }
192 }
193
194 fn metadata(array: &PcoArray) -> VortexResult<Self::Metadata> {
195 Ok(ProstMetadata(array.metadata.clone()))
196 }
197
198 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
199 Ok(Some(metadata.0.encode_to_vec()))
200 }
201
202 fn deserialize(
203 bytes: &[u8],
204 _dtype: &DType,
205 _len: usize,
206 _buffers: &[BufferHandle],
207 _session: &VortexSession,
208 ) -> VortexResult<Self::Metadata> {
209 Ok(ProstMetadata(PcoMetadata::decode(bytes)?))
210 }
211
212 fn build(
213 dtype: &DType,
214 len: usize,
215 metadata: &Self::Metadata,
216 buffers: &[BufferHandle],
217 children: &dyn ArrayChildren,
218 ) -> VortexResult<PcoArray> {
219 let validity = if children.is_empty() {
220 Validity::from(dtype.nullability())
221 } else if children.len() == 1 {
222 let validity = children.get(0, &Validity::DTYPE, len)?;
223 Validity::Array(validity)
224 } else {
225 vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
226 };
227
228 vortex_ensure!(buffers.len() >= metadata.0.chunks.len());
229 let chunk_metas = buffers[..metadata.0.chunks.len()]
230 .iter()
231 .map(|b| b.clone().try_to_host_sync())
232 .collect::<VortexResult<Vec<_>>>()?;
233 let pages = buffers[metadata.0.chunks.len()..]
234 .iter()
235 .map(|b| b.clone().try_to_host_sync())
236 .collect::<VortexResult<Vec<_>>>()?;
237
238 let expected_n_pages = metadata
239 .0
240 .chunks
241 .iter()
242 .map(|info| info.pages.len())
243 .sum::<usize>();
244 vortex_ensure!(pages.len() == expected_n_pages);
245
246 Ok(PcoArray::new(
247 chunk_metas,
248 pages,
249 dtype.clone(),
250 metadata.0.clone(),
251 len,
252 validity,
253 ))
254 }
255
256 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
257 vortex_ensure!(
258 children.len() <= 1,
259 "PcoArray expects 0 or 1 children, got {}",
260 children.len()
261 );
262
263 if children.is_empty() {
264 array.unsliced_validity = Validity::from(array.dtype.nullability());
265 } else {
266 array.unsliced_validity =
267 Validity::Array(children.into_iter().next().vortex_expect("validity child"));
268 }
269
270 Ok(())
271 }
272
273 fn execute(array: Arc<Self::Array>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
274 Ok(ExecutionResult::done(array.decompress(ctx)?.into_array()))
275 }
276
277 fn reduce_parent(
278 array: &Self::Array,
279 parent: &ArrayRef,
280 child_idx: usize,
281 ) -> VortexResult<Option<ArrayRef>> {
282 crate::rules::RULES.evaluate(array, parent, child_idx)
283 }
284}
285
286pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
287 let ptype = dtype.as_ptype();
288 match ptype {
289 PType::F16 => NumberType::F16,
290 PType::F32 => NumberType::F32,
291 PType::F64 => NumberType::F64,
292 PType::I16 => NumberType::I16,
293 PType::I32 => NumberType::I32,
294 PType::I64 => NumberType::I64,
295 PType::U16 => NumberType::U16,
296 PType::U32 => NumberType::U32,
297 PType::U64 => NumberType::U64,
298 _ => unreachable!("PType not supported by Pco: {:?}", ptype),
299 }
300}
301
302fn collect_valid(parray: &PrimitiveArray) -> VortexResult<PrimitiveArray> {
303 let mask = parray.validity_mask()?;
304 Ok(parray.clone().into_array().filter(mask)?.to_primitive())
305}
306
307pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
308 use pco::errors::ErrorKind::*;
309 match err.kind {
310 Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
311 InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
312 other => vortex_err!("Pco {:?} error: {}", other, err.message),
313 }
314}
315
316#[derive(Clone, Debug)]
317pub struct Pco;
318
319impl Pco {
320 pub const ID: ArrayId = ArrayId::new_ref("vortex.pco");
321}
322
323#[derive(Clone, Debug)]
324pub struct PcoArray {
325 pub(crate) chunk_metas: Vec<ByteBuffer>,
326 pub(crate) pages: Vec<ByteBuffer>,
327 pub(crate) metadata: PcoMetadata,
328 dtype: DType,
329 pub(crate) unsliced_validity: Validity,
330 unsliced_n_rows: usize,
331 stats_set: ArrayStats,
332 slice_start: usize,
333 slice_stop: usize,
334}
335
336impl PcoArray {
337 pub fn new(
338 chunk_metas: Vec<ByteBuffer>,
339 pages: Vec<ByteBuffer>,
340 dtype: DType,
341 metadata: PcoMetadata,
342 len: usize,
343 validity: Validity,
344 ) -> Self {
345 Self {
346 chunk_metas,
347 pages,
348 metadata,
349 dtype,
350 unsliced_validity: validity,
351 unsliced_n_rows: len,
352 stats_set: Default::default(),
353 slice_start: 0,
354 slice_stop: len,
355 }
356 }
357
358 pub fn from_primitive(
359 parray: &PrimitiveArray,
360 level: usize,
361 values_per_page: usize,
362 ) -> VortexResult<Self> {
363 Self::from_primitive_with_values_per_chunk(parray, level, VALUES_PER_CHUNK, values_per_page)
364 }
365
366 pub(crate) fn from_primitive_with_values_per_chunk(
367 parray: &PrimitiveArray,
368 level: usize,
369 values_per_chunk: usize,
370 values_per_page: usize,
371 ) -> VortexResult<Self> {
372 let number_type = number_type_from_dtype(parray.dtype());
373 let values_per_page = if values_per_page == 0 {
374 values_per_chunk
375 } else {
376 values_per_page
377 };
378
379 let chunk_config = ChunkConfig::default()
381 .with_compression_level(level)
382 .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
383
384 let values = collect_valid(parray)?;
385 let n_values = values.len();
386
387 let fc = FileCompressor::default();
388 let mut header = vec![];
389 fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
390
391 let mut chunk_meta_buffers = vec![]; let mut chunk_infos = vec![]; let mut page_buffers = vec![];
394 for chunk_start in (0..n_values).step_by(values_per_chunk) {
395 let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
396 let mut cc = match_number_enum!(
397 number_type,
398 NumberType<T> => {
399 let values = values.to_buffer::<T>();
400 let chunk = &values.as_slice()[chunk_start..chunk_end];
401 fc
402 .chunk_compressor(chunk, &chunk_config)
403 .map_err(vortex_err_from_pco)?
404 }
405 );
406
407 let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
408 cc.write_meta(&mut chunk_meta_buffer)
409 .map_err(vortex_err_from_pco)?;
410 chunk_meta_buffers.push(chunk_meta_buffer.freeze());
411
412 let mut page_infos = vec![];
413 for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
414 let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
415 cc.write_page(page_idx, &mut page)
416 .map_err(vortex_err_from_pco)?;
417 page_buffers.push(page.freeze());
418 page_infos.push(PcoPageInfo {
419 n_values: u32::try_from(page_n_values)?,
420 });
421 }
422 chunk_infos.push(PcoChunkInfo { pages: page_infos })
423 }
424
425 let metadata = PcoMetadata {
426 header,
427 chunks: chunk_infos,
428 };
429 Ok(PcoArray::new(
430 chunk_meta_buffers,
431 page_buffers,
432 parray.dtype().clone(),
433 metadata,
434 parray.len(),
435 parray.validity().clone(),
436 ))
437 }
438
439 pub fn from_array(array: ArrayRef, level: usize, nums_per_page: usize) -> VortexResult<Self> {
440 if let Some(parray) = array.as_opt::<Primitive>() {
441 Self::from_primitive(parray, level, nums_per_page)
442 } else {
443 Err(vortex_err!("Pco can only encode primitive arrays"))
444 }
445 }
446
447 pub fn decompress(&self, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
448 let number_type = number_type_from_dtype(&self.dtype);
451 let values_byte_buffer = match_number_enum!(
452 number_type,
453 NumberType<T> => {
454 self.decompress_values_typed::<T>(ctx)?
455 }
456 );
457
458 Ok(PrimitiveArray::from_values_byte_buffer(
459 values_byte_buffer,
460 self.dtype.as_ptype(),
461 self.unsliced_validity
462 .slice(self.slice_start..self.slice_stop)?,
463 self.slice_stop - self.slice_start,
464 ))
465 }
466
467 fn decompress_values_typed<T: Number>(
468 &self,
469 ctx: &mut ExecutionCtx,
470 ) -> VortexResult<ByteBuffer> {
471 let slice_value_indices = self
473 .unsliced_validity
474 .execute_mask(self.unsliced_n_rows, ctx)?
475 .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
476 let slice_value_start = slice_value_indices[0];
477 let slice_value_stop = slice_value_indices[1];
478 let slice_n_values = slice_value_stop - slice_value_start;
479
480 let (fd, _) =
483 FileDecompressor::new(self.metadata.header.as_slice()).map_err(vortex_err_from_pco)?;
484 let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
485 let mut page_idx = 0;
486 let mut page_value_start = 0;
487 let mut n_skipped_values = 0;
488 for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
489 let mut chunk_decompressor: Option<ChunkDecompressor<T>> = None;
491 for page_info in &chunk_info.pages {
492 let page_n_values = page_info.n_values as usize;
493 let page_value_stop = page_value_start + page_n_values;
494
495 if page_value_start >= slice_value_stop {
496 break;
497 }
498
499 if page_value_stop > slice_value_start {
500 let old_len = decompressed_values.len();
502 let new_len = old_len + page_n_values;
503 decompressed_values.reserve(page_n_values);
504 unsafe {
505 decompressed_values.set_len(new_len);
506 }
507 let page: &[u8] = self.pages[page_idx].as_ref();
508
509 let mut cd = match chunk_decompressor.take() {
510 Some(d) => d,
511 None => {
512 let (new_cd, _) = fd
513 .chunk_decompressor(chunk_meta.as_ref())
514 .map_err(vortex_err_from_pco)?;
515 new_cd
516 }
517 };
518
519 let mut pd = cd
520 .page_decompressor(page, page_n_values)
521 .map_err(vortex_err_from_pco)?;
522 pd.read(&mut decompressed_values[old_len..new_len])
523 .map_err(vortex_err_from_pco)?;
524
525 chunk_decompressor = Some(cd);
526 } else {
527 n_skipped_values += page_n_values;
528 }
529
530 page_value_start = page_value_stop;
531 page_idx += 1;
532 }
533 }
534
535 let value_offset = slice_value_start - n_skipped_values;
537 Ok(decompressed_values
538 .freeze()
539 .slice(value_offset..value_offset + slice_n_values)
540 .into_byte_buffer())
541 }
542
543 pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
544 PcoArray {
545 slice_start: self.slice_start + start,
546 slice_stop: self.slice_start + stop,
547 stats_set: Default::default(),
548 ..self.clone()
549 }
550 }
551
552 pub(crate) fn dtype(&self) -> &DType {
553 &self.dtype
554 }
555
556 pub(crate) fn slice_start(&self) -> usize {
557 self.slice_start
558 }
559
560 pub(crate) fn slice_stop(&self) -> usize {
561 self.slice_stop
562 }
563
564 pub(crate) fn unsliced_n_rows(&self) -> usize {
565 self.unsliced_n_rows
566 }
567}
568
569impl ValiditySliceHelper for PcoArray {
570 fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
571 (&self.unsliced_validity, self.slice_start, self.slice_stop)
572 }
573}
574
575impl OperationsVTable<Pco> for Pco {
576 fn scalar_at(array: &PcoArray, index: usize) -> VortexResult<Scalar> {
577 let mut ctx = LEGACY_SESSION.create_execution_ctx();
578 array
579 ._slice(index, index + 1)
580 .decompress(&mut ctx)?
581 .scalar_at(0)
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use vortex_array::IntoArray;
588 use vortex_array::arrays::PrimitiveArray;
589 use vortex_array::assert_arrays_eq;
590 use vortex_array::validity::Validity;
591 use vortex_buffer::buffer;
592
593 use crate::PcoArray;
594
595 #[test]
596 fn test_slice_nullable() {
597 let values = PrimitiveArray::new(
599 buffer![10u32, 20, 30, 40, 50, 60],
600 Validity::from_iter([false, true, true, true, true, false]),
601 );
602 let pco = PcoArray::from_primitive(&values, 0, 128).unwrap();
603 assert_arrays_eq!(
604 pco,
605 PrimitiveArray::from_option_iter([
606 None,
607 Some(20u32),
608 Some(30),
609 Some(40),
610 Some(50),
611 None
612 ])
613 );
614
615 let sliced = pco.slice(1..5).unwrap();
617 let expected =
618 PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
619 .into_array();
620 assert_arrays_eq!(sliced, expected);
621 }
622}