1use std::cmp;
5use std::fmt::Debug;
6use std::fmt::Display;
7use std::fmt::Formatter;
8use std::hash::Hash;
9use std::hash::Hasher;
10
11use pco::ChunkConfig;
12use pco::PagingSpec;
13use pco::data_types::Number;
14use pco::data_types::NumberType;
15use pco::errors::PcoError;
16use pco::match_number_enum;
17use pco::wrapped::ChunkDecompressor;
18use pco::wrapped::FileCompressor;
19use pco::wrapped::FileDecompressor;
20use prost::Message;
21use vortex_array::Array;
22use vortex_array::ArrayEq;
23use vortex_array::ArrayHash;
24use vortex_array::ArrayId;
25use vortex_array::ArrayParts;
26use vortex_array::ArrayRef;
27use vortex_array::ArrayView;
28use vortex_array::ExecutionCtx;
29use vortex_array::ExecutionResult;
30use vortex_array::IntoArray;
31use vortex_array::Precision;
32use vortex_array::arrays::Primitive;
33use vortex_array::arrays::PrimitiveArray;
34use vortex_array::buffer::BufferHandle;
35use vortex_array::dtype::DType;
36use vortex_array::dtype::PType;
37use vortex_array::dtype::half;
38use vortex_array::scalar::Scalar;
39use vortex_array::serde::ArrayChildren;
40use vortex_array::validity::Validity;
41use vortex_array::vtable::OperationsVTable;
42use vortex_array::vtable::VTable;
43use vortex_array::vtable::ValidityVTable;
44use vortex_array::vtable::child_to_validity;
45use vortex_array::vtable::validity_to_child;
46use vortex_buffer::BufferMut;
47use vortex_buffer::ByteBuffer;
48use vortex_buffer::ByteBufferMut;
49use vortex_error::VortexError;
50use vortex_error::VortexResult;
51use vortex_error::vortex_bail;
52use vortex_error::vortex_ensure;
53use vortex_error::vortex_err;
54use vortex_session::VortexSession;
55use vortex_session::registry::CachedId;
56
57use crate::PcoChunkInfo;
58use crate::PcoMetadata;
59use crate::PcoPageInfo;
60
61const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
80
81pub type PcoArray = Array<Pco>;
83
84impl ArrayHash for PcoData {
85 fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
86 self.unsliced_n_rows.hash(state);
87 self.slice_start.hash(state);
88 self.slice_stop.hash(state);
89 for chunk_meta in &self.chunk_metas {
91 chunk_meta.array_hash(state, precision);
92 }
93 for page in &self.pages {
94 page.array_hash(state, precision);
95 }
96 }
97}
98
99impl ArrayEq for PcoData {
100 fn array_eq(&self, other: &Self, precision: Precision) -> bool {
101 if self.unsliced_n_rows != other.unsliced_n_rows
102 || self.slice_start != other.slice_start
103 || self.slice_stop != other.slice_stop
104 || self.chunk_metas.len() != other.chunk_metas.len()
105 || self.pages.len() != other.pages.len()
106 {
107 return false;
108 }
109 for (a, b) in self.chunk_metas.iter().zip(&other.chunk_metas) {
110 if !a.array_eq(b, precision) {
111 return false;
112 }
113 }
114 for (a, b) in self.pages.iter().zip(&other.pages) {
115 if !a.array_eq(b, precision) {
116 return false;
117 }
118 }
119 true
120 }
121}
122
123impl VTable for Pco {
124 type ArrayData = PcoData;
125
126 type OperationsVTable = Self;
127 type ValidityVTable = Self;
128
129 fn id(&self) -> ArrayId {
130 static ID: CachedId = CachedId::new("vortex.pco");
131 *ID
132 }
133
134 fn validate(
135 &self,
136 data: &PcoData,
137 dtype: &DType,
138 len: usize,
139 slots: &[Option<ArrayRef>],
140 ) -> VortexResult<()> {
141 let validity = child_to_validity(slots[0].as_ref(), dtype.nullability());
142 data.validate(dtype, len, &validity)
143 }
144
145 fn nbuffers(array: ArrayView<'_, Self>) -> usize {
146 array.chunk_metas.len() + array.pages.len()
147 }
148
149 fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
150 if idx < array.chunk_metas.len() {
151 BufferHandle::new_host(array.chunk_metas[idx].clone())
152 } else {
153 let page_idx = idx - array.chunk_metas.len();
154 BufferHandle::new_host(array.pages[page_idx].clone())
155 }
156 }
157
158 fn buffer_name(array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
159 if idx < array.chunk_metas.len() {
160 Some(format!("chunk_meta_{idx}"))
161 } else {
162 Some(format!("page_{}", idx - array.chunk_metas.len()))
163 }
164 }
165
166 fn serialize(
167 array: ArrayView<'_, Self>,
168 _session: &VortexSession,
169 ) -> VortexResult<Option<Vec<u8>>> {
170 Ok(Some(array.metadata.clone().encode_to_vec()))
171 }
172
173 fn deserialize(
174 &self,
175 dtype: &DType,
176 len: usize,
177 metadata: &[u8],
178 buffers: &[BufferHandle],
179 children: &dyn ArrayChildren,
180 _session: &VortexSession,
181 ) -> VortexResult<ArrayParts<Self>> {
182 let metadata = PcoMetadata::decode(metadata)?;
183 let validity = if children.is_empty() {
184 Validity::from(dtype.nullability())
185 } else if children.len() == 1 {
186 let validity = children.get(0, &Validity::DTYPE, len)?;
187 Validity::Array(validity)
188 } else {
189 vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
190 };
191
192 vortex_ensure!(buffers.len() >= metadata.chunks.len());
193 let chunk_metas = buffers[..metadata.chunks.len()]
194 .iter()
195 .map(|b| b.clone().try_to_host_sync())
196 .collect::<VortexResult<Vec<_>>>()?;
197 let pages = buffers[metadata.chunks.len()..]
198 .iter()
199 .map(|b| b.clone().try_to_host_sync())
200 .collect::<VortexResult<Vec<_>>>()?;
201
202 let expected_n_pages = metadata
203 .chunks
204 .iter()
205 .map(|info| info.pages.len())
206 .sum::<usize>();
207 vortex_ensure!(pages.len() == expected_n_pages);
208
209 let slots = vec![validity_to_child(&validity, len)];
210 let data = PcoData::new(chunk_metas, pages, dtype.as_ptype(), metadata, len);
211 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
212 }
213
214 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
215 SLOT_NAMES[idx].to_string()
216 }
217
218 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
219 let unsliced_validity = child_to_validity(
220 array.as_ref().slots()[0].as_ref(),
221 array.dtype().nullability(),
222 );
223 Ok(ExecutionResult::done(
224 array
225 .data()
226 .decompress(&unsliced_validity, ctx)?
227 .into_array(),
228 ))
229 }
230
231 fn reduce_parent(
232 array: ArrayView<'_, Self>,
233 parent: &ArrayRef,
234 child_idx: usize,
235 ) -> VortexResult<Option<ArrayRef>> {
236 crate::rules::RULES.evaluate(array, parent, child_idx)
237 }
238}
239
240pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
241 number_type_from_ptype(dtype.as_ptype())
242}
243
244pub(crate) fn number_type_from_ptype(ptype: PType) -> NumberType {
245 match ptype {
246 PType::F16 => NumberType::F16,
247 PType::F32 => NumberType::F32,
248 PType::F64 => NumberType::F64,
249 PType::I16 => NumberType::I16,
250 PType::I32 => NumberType::I32,
251 PType::I64 => NumberType::I64,
252 PType::U16 => NumberType::U16,
253 PType::U32 => NumberType::U32,
254 PType::U64 => NumberType::U64,
255 _ => unreachable!("PType not supported by Pco: {:?}", ptype),
256 }
257}
258
259fn collect_valid(
260 parray: ArrayView<'_, Primitive>,
261 ctx: &mut ExecutionCtx,
262) -> VortexResult<PrimitiveArray> {
263 let mask = parray
264 .array()
265 .validity()?
266 .execute_mask(parray.array().len(), ctx)?;
267 let result = parray
268 .array()
269 .filter(mask)?
270 .execute::<PrimitiveArray>(ctx)?;
271 Ok(result)
272}
273
274pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
275 use pco::errors::ErrorKind::*;
276 match err.kind {
277 Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
278 InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
279 other => vortex_err!("Pco {:?} error: {}", other, err.message),
280 }
281}
282
283#[derive(Clone, Debug)]
284pub struct Pco;
285
286impl Pco {
287 pub(crate) fn try_new(
288 dtype: DType,
289 data: PcoData,
290 validity: Validity,
291 ) -> VortexResult<PcoArray> {
292 let len = data.len();
293 data.validate(&dtype, len, &validity)?;
294 let slots = vec![validity_to_child(&validity, data.unsliced_n_rows())];
295 Ok(unsafe {
296 Array::from_parts_unchecked(ArrayParts::new(Pco, dtype, len, data).with_slots(slots))
297 })
298 }
299
300 pub fn from_primitive(
302 parray: ArrayView<'_, Primitive>,
303 level: usize,
304 values_per_page: usize,
305 ctx: &mut ExecutionCtx,
306 ) -> VortexResult<PcoArray> {
307 let dtype = parray.dtype().clone();
308 let validity = parray.validity()?;
309 let data = PcoData::from_primitive(parray, level, values_per_page, ctx)?;
310 Self::try_new(dtype, data, validity)
311 }
312}
313
314pub(super) const NUM_SLOTS: usize = 1;
316pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
317
318#[derive(Clone, Debug)]
319pub struct PcoData {
320 pub(crate) chunk_metas: Vec<ByteBuffer>,
321 pub(crate) pages: Vec<ByteBuffer>,
322 pub(crate) metadata: PcoMetadata,
323 ptype: PType,
324 unsliced_n_rows: usize,
325 slice_start: usize,
326 slice_stop: usize,
327}
328
329impl Display for PcoData {
330 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
331 write!(
332 f,
333 "ptype: {}, nrows: {}, slice: {}..{}",
334 self.ptype, self.unsliced_n_rows, self.slice_start, self.slice_stop
335 )
336 }
337}
338
339impl PcoData {
340 pub fn validate(&self, dtype: &DType, len: usize, validity: &Validity) -> VortexResult<()> {
341 let _ = number_type_from_ptype(self.ptype);
342 vortex_ensure!(
343 dtype.as_ptype() == self.ptype,
344 "expected ptype {}, got {}",
345 self.ptype,
346 dtype.as_ptype()
347 );
348 vortex_ensure!(
349 dtype.nullability() == validity.nullability(),
350 "expected nullability {}, got {}",
351 validity.nullability(),
352 dtype.nullability()
353 );
354 vortex_ensure!(
355 self.slice_start <= self.slice_stop && self.slice_stop <= self.unsliced_n_rows,
356 "invalid slice range {}..{} for {} rows",
357 self.slice_start,
358 self.slice_stop,
359 self.unsliced_n_rows
360 );
361 vortex_ensure!(
362 self.slice_stop - self.slice_start == len,
363 "expected len {len}, got {}",
364 self.slice_stop - self.slice_start
365 );
366 if let Some(validity_len) = validity.maybe_len() {
367 vortex_ensure!(
368 validity_len == self.unsliced_n_rows,
369 "expected validity len {}, got {}",
370 self.unsliced_n_rows,
371 validity_len
372 );
373 }
374 vortex_ensure!(
375 self.chunk_metas.len() == self.metadata.chunks.len(),
376 "expected {} chunk metas, got {}",
377 self.metadata.chunks.len(),
378 self.chunk_metas.len()
379 );
380 vortex_ensure!(
381 self.pages.len()
382 == self
383 .metadata
384 .chunks
385 .iter()
386 .map(|chunk| chunk.pages.len())
387 .sum::<usize>(),
388 "page count does not match metadata"
389 );
390 Ok(())
391 }
392
393 pub fn new(
394 chunk_metas: Vec<ByteBuffer>,
395 pages: Vec<ByteBuffer>,
396 ptype: PType,
397 metadata: PcoMetadata,
398 len: usize,
399 ) -> Self {
400 Self {
401 chunk_metas,
402 pages,
403 metadata,
404 ptype,
405 unsliced_n_rows: len,
406 slice_start: 0,
407 slice_stop: len,
408 }
409 }
410
411 pub fn from_primitive(
412 parray: ArrayView<'_, Primitive>,
413 level: usize,
414 values_per_page: usize,
415 ctx: &mut ExecutionCtx,
416 ) -> VortexResult<Self> {
417 Self::from_primitive_with_values_per_chunk(
418 parray,
419 level,
420 VALUES_PER_CHUNK,
421 values_per_page,
422 ctx,
423 )
424 }
425
426 pub(crate) fn from_primitive_with_values_per_chunk(
427 parray: ArrayView<'_, Primitive>,
428 level: usize,
429 values_per_chunk: usize,
430 values_per_page: usize,
431 ctx: &mut ExecutionCtx,
432 ) -> VortexResult<Self> {
433 let number_type = number_type_from_dtype(parray.dtype());
434 let values_per_page = if values_per_page == 0 {
435 values_per_chunk
436 } else {
437 values_per_page
438 };
439
440 let chunk_config = ChunkConfig::default()
442 .with_compression_level(level)
443 .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
444
445 let values = collect_valid(parray, ctx)?;
446 let n_values = values.len();
447
448 let fc = FileCompressor::default();
449 let mut header = vec![];
450 fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
451
452 let mut chunk_meta_buffers = vec![]; let mut chunk_infos = vec![]; let mut page_buffers = vec![];
455 for chunk_start in (0..n_values).step_by(values_per_chunk) {
456 let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
457 let mut cc = match_number_enum!(
458 number_type,
459 NumberType<T> => {
460 let values = values.to_buffer::<T>();
461 let chunk = &values.as_slice()[chunk_start..chunk_end];
462 fc
463 .chunk_compressor(chunk, &chunk_config)
464 .map_err(vortex_err_from_pco)?
465 }
466 );
467
468 let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
469 cc.write_meta(&mut chunk_meta_buffer)
470 .map_err(vortex_err_from_pco)?;
471 chunk_meta_buffers.push(chunk_meta_buffer.freeze());
472
473 let mut page_infos = vec![];
474 for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
475 let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
476 cc.write_page(page_idx, &mut page)
477 .map_err(vortex_err_from_pco)?;
478 page_buffers.push(page.freeze());
479 page_infos.push(PcoPageInfo {
480 n_values: u32::try_from(page_n_values)?,
481 });
482 }
483 chunk_infos.push(PcoChunkInfo { pages: page_infos })
484 }
485
486 let metadata = PcoMetadata {
487 header,
488 chunks: chunk_infos,
489 };
490 Ok(PcoData::new(
491 chunk_meta_buffers,
492 page_buffers,
493 parray.dtype().as_ptype(),
494 metadata,
495 parray.len(),
496 ))
497 }
498
499 pub fn from_array(
500 array: ArrayRef,
501 level: usize,
502 nums_per_page: usize,
503 ctx: &mut ExecutionCtx,
504 ) -> VortexResult<Self> {
505 let parray = array.try_downcast::<Primitive>().map_err(|a| {
506 vortex_err!(
507 "Pco can only encode primitive arrays, got {}",
508 a.encoding_id()
509 )
510 })?;
511 Self::from_primitive(parray.as_view(), level, nums_per_page, ctx)
512 }
513
514 pub fn decompress(
515 &self,
516 unsliced_validity: &Validity,
517 ctx: &mut ExecutionCtx,
518 ) -> VortexResult<PrimitiveArray> {
519 let number_type = number_type_from_ptype(self.ptype);
522 let values_byte_buffer = match_number_enum!(
523 number_type,
524 NumberType<T> => {
525 self.decompress_values_typed::<T>(unsliced_validity, ctx)?
526 }
527 );
528
529 Ok(PrimitiveArray::from_values_byte_buffer(
530 values_byte_buffer,
531 self.ptype,
532 unsliced_validity.slice(self.slice_start..self.slice_stop)?,
533 self.slice_stop - self.slice_start,
534 ))
535 }
536
537 fn decompress_values_typed<T: Number>(
538 &self,
539 unsliced_validity: &Validity,
540 ctx: &mut ExecutionCtx,
541 ) -> VortexResult<ByteBuffer> {
542 let slice_value_indices = unsliced_validity
544 .execute_mask(self.unsliced_n_rows, ctx)?
545 .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
546 let slice_value_start = slice_value_indices[0];
547 let slice_value_stop = slice_value_indices[1];
548 let slice_n_values = slice_value_stop - slice_value_start;
549
550 let (fd, _) =
553 FileDecompressor::new(self.metadata.header.as_slice()).map_err(vortex_err_from_pco)?;
554 let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
555 let mut page_idx = 0;
556 let mut page_value_start = 0;
557 let mut n_skipped_values = 0;
558 for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
559 let mut chunk_decompressor: Option<ChunkDecompressor<T>> = None;
561 for page_info in &chunk_info.pages {
562 let page_n_values = page_info.n_values as usize;
563 let page_value_stop = page_value_start + page_n_values;
564
565 if page_value_start >= slice_value_stop {
566 break;
567 }
568
569 if page_value_stop > slice_value_start {
570 let old_len = decompressed_values.len();
572 let new_len = old_len + page_n_values;
573 decompressed_values.reserve(page_n_values);
574 unsafe {
575 decompressed_values.set_len(new_len);
576 }
577 let page: &[u8] = self.pages[page_idx].as_ref();
578
579 let mut cd = match chunk_decompressor.take() {
580 Some(d) => d,
581 None => {
582 let (new_cd, _) = fd
583 .chunk_decompressor(chunk_meta.as_ref())
584 .map_err(vortex_err_from_pco)?;
585 new_cd
586 }
587 };
588
589 let mut pd = cd
590 .page_decompressor(page, page_n_values)
591 .map_err(vortex_err_from_pco)?;
592 pd.read(&mut decompressed_values[old_len..new_len])
593 .map_err(vortex_err_from_pco)?;
594
595 chunk_decompressor = Some(cd);
596 } else {
597 n_skipped_values += page_n_values;
598 }
599
600 page_value_start = page_value_stop;
601 page_idx += 1;
602 }
603 }
604
605 let value_offset = slice_value_start - n_skipped_values;
607 Ok(decompressed_values
608 .freeze()
609 .slice(value_offset..value_offset + slice_n_values)
610 .into_byte_buffer())
611 }
612
613 pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
614 PcoData {
615 slice_start: self.slice_start + start,
616 slice_stop: self.slice_start + stop,
617 ..self.clone()
618 }
619 }
620
621 pub fn len(&self) -> usize {
623 self.slice_stop - self.slice_start
624 }
625
626 pub fn is_empty(&self) -> bool {
628 self.slice_stop == self.slice_start
629 }
630
631 pub(crate) fn slice_start(&self) -> usize {
632 self.slice_start
633 }
634
635 pub(crate) fn slice_stop(&self) -> usize {
636 self.slice_stop
637 }
638
639 pub(crate) fn unsliced_n_rows(&self) -> usize {
640 self.unsliced_n_rows
641 }
642}
643
644impl ValidityVTable<Pco> for Pco {
645 fn validity(array: ArrayView<'_, Pco>) -> VortexResult<Validity> {
646 let unsliced_validity =
647 child_to_validity(array.slots()[0].as_ref(), array.dtype().nullability());
648 unsliced_validity.slice(array.slice_start()..array.slice_stop())
649 }
650}
651
652impl OperationsVTable<Pco> for Pco {
653 fn scalar_at(
654 array: ArrayView<'_, Pco>,
655 index: usize,
656 ctx: &mut ExecutionCtx,
657 ) -> VortexResult<Scalar> {
658 let unsliced_validity =
659 child_to_validity(array.slots()[0].as_ref(), array.dtype().nullability());
660 array
661 ._slice(index, index + 1)
662 .decompress(&unsliced_validity, ctx)?
663 .into_array()
664 .execute_scalar(0, ctx)
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use vortex_array::IntoArray;
671 use vortex_array::LEGACY_SESSION;
672 use vortex_array::VortexSessionExecute;
673 use vortex_array::arrays::PrimitiveArray;
674 use vortex_array::assert_arrays_eq;
675 use vortex_array::validity::Validity;
676 use vortex_buffer::buffer;
677
678 use crate::Pco;
679
680 #[test]
681 fn test_slice_nullable() {
682 let mut ctx = LEGACY_SESSION.create_execution_ctx();
683 let values = PrimitiveArray::new(
685 buffer![10u32, 20, 30, 40, 50, 60],
686 Validity::from_iter([false, true, true, true, true, false]),
687 );
688 let pco = Pco::from_primitive(values.as_view(), 0, 128, &mut ctx).unwrap();
689 assert_arrays_eq!(
690 pco,
691 PrimitiveArray::from_option_iter([
692 None,
693 Some(20u32),
694 Some(30),
695 Some(40),
696 Some(50),
697 None
698 ])
699 );
700
701 let sliced = pco.slice(1..5).unwrap();
703 let expected =
704 PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
705 .into_array();
706 assert_arrays_eq!(sliced, expected);
707 }
708}