1use std::cmp;
5use std::fmt::Debug;
6use std::fmt::Display;
7use std::fmt::Formatter;
8use std::hash::Hash;
9use std::hash::Hasher;
10
11use pco::ChunkConfig;
12use pco::PagingSpec;
13use pco::data_types::Number;
14use pco::data_types::NumberType;
15use pco::errors::PcoError;
16use pco::match_number_enum;
17use pco::wrapped::ChunkDecompressor;
18use pco::wrapped::FileCompressor;
19use pco::wrapped::FileDecompressor;
20use prost::Message;
21use vortex_array::Array;
22use vortex_array::ArrayEq;
23use vortex_array::ArrayHash;
24use vortex_array::ArrayId;
25use vortex_array::ArrayParts;
26use vortex_array::ArrayRef;
27use vortex_array::ArrayView;
28use vortex_array::ExecutionCtx;
29use vortex_array::ExecutionResult;
30use vortex_array::IntoArray;
31use vortex_array::Precision;
32use vortex_array::arrays::Primitive;
33use vortex_array::arrays::PrimitiveArray;
34use vortex_array::buffer::BufferHandle;
35use vortex_array::dtype::DType;
36use vortex_array::dtype::PType;
37use vortex_array::dtype::half;
38use vortex_array::scalar::Scalar;
39use vortex_array::serde::ArrayChildren;
40use vortex_array::smallvec::smallvec;
41use vortex_array::validity::Validity;
42use vortex_array::vtable::OperationsVTable;
43use vortex_array::vtable::VTable;
44use vortex_array::vtable::ValidityVTable;
45use vortex_array::vtable::child_to_validity;
46use vortex_array::vtable::validity_to_child;
47use vortex_buffer::BufferMut;
48use vortex_buffer::ByteBuffer;
49use vortex_buffer::ByteBufferMut;
50use vortex_error::VortexError;
51use vortex_error::VortexResult;
52use vortex_error::vortex_bail;
53use vortex_error::vortex_ensure;
54use vortex_error::vortex_err;
55use vortex_session::VortexSession;
56use vortex_session::registry::CachedId;
57
58use crate::PcoChunkInfo;
59use crate::PcoMetadata;
60use crate::PcoPageInfo;
61
62const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
81
82pub type PcoArray = Array<Pco>;
84
85impl ArrayHash for PcoData {
86 fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
87 self.unsliced_n_rows.hash(state);
88 self.slice_start.hash(state);
89 self.slice_stop.hash(state);
90 for chunk_meta in &self.chunk_metas {
92 chunk_meta.array_hash(state, precision);
93 }
94 for page in &self.pages {
95 page.array_hash(state, precision);
96 }
97 }
98}
99
100impl ArrayEq for PcoData {
101 fn array_eq(&self, other: &Self, precision: Precision) -> bool {
102 if self.unsliced_n_rows != other.unsliced_n_rows
103 || self.slice_start != other.slice_start
104 || self.slice_stop != other.slice_stop
105 || self.chunk_metas.len() != other.chunk_metas.len()
106 || self.pages.len() != other.pages.len()
107 {
108 return false;
109 }
110 for (a, b) in self.chunk_metas.iter().zip(&other.chunk_metas) {
111 if !a.array_eq(b, precision) {
112 return false;
113 }
114 }
115 for (a, b) in self.pages.iter().zip(&other.pages) {
116 if !a.array_eq(b, precision) {
117 return false;
118 }
119 }
120 true
121 }
122}
123
124impl VTable for Pco {
125 type TypedArrayData = PcoData;
126
127 type OperationsVTable = Self;
128 type ValidityVTable = Self;
129
130 fn id(&self) -> ArrayId {
131 static ID: CachedId = CachedId::new("vortex.pco");
132 *ID
133 }
134
135 fn validate(
136 &self,
137 data: &PcoData,
138 dtype: &DType,
139 len: usize,
140 slots: &[Option<ArrayRef>],
141 ) -> VortexResult<()> {
142 let validity = child_to_validity(slots[0].as_ref(), dtype.nullability());
143 data.validate(dtype, len, &validity)
144 }
145
146 fn nbuffers(array: ArrayView<'_, Self>) -> usize {
147 array.chunk_metas.len() + array.pages.len()
148 }
149
150 fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
151 if idx < array.chunk_metas.len() {
152 BufferHandle::new_host(array.chunk_metas[idx].clone())
153 } else {
154 let page_idx = idx - array.chunk_metas.len();
155 BufferHandle::new_host(array.pages[page_idx].clone())
156 }
157 }
158
159 fn buffer_name(array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
160 if idx < array.chunk_metas.len() {
161 Some(format!("chunk_meta_{idx}"))
162 } else {
163 Some(format!("page_{}", idx - array.chunk_metas.len()))
164 }
165 }
166
167 fn serialize(
168 array: ArrayView<'_, Self>,
169 _session: &VortexSession,
170 ) -> VortexResult<Option<Vec<u8>>> {
171 Ok(Some(array.metadata.clone().encode_to_vec()))
172 }
173
174 fn deserialize(
175 &self,
176 dtype: &DType,
177 len: usize,
178 metadata: &[u8],
179 buffers: &[BufferHandle],
180 children: &dyn ArrayChildren,
181 _session: &VortexSession,
182 ) -> VortexResult<ArrayParts<Self>> {
183 let metadata = PcoMetadata::decode(metadata)?;
184 let validity = if children.is_empty() {
185 Validity::from(dtype.nullability())
186 } else if children.len() == 1 {
187 let validity = children.get(0, &Validity::DTYPE, len)?;
188 Validity::Array(validity)
189 } else {
190 vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
191 };
192
193 vortex_ensure!(buffers.len() >= metadata.chunks.len());
194 let chunk_metas = buffers[..metadata.chunks.len()]
195 .iter()
196 .map(|b| b.clone().try_to_host_sync())
197 .collect::<VortexResult<Vec<_>>>()?;
198 let pages = buffers[metadata.chunks.len()..]
199 .iter()
200 .map(|b| b.clone().try_to_host_sync())
201 .collect::<VortexResult<Vec<_>>>()?;
202
203 let expected_n_pages = metadata
204 .chunks
205 .iter()
206 .map(|info| info.pages.len())
207 .sum::<usize>();
208 vortex_ensure!(pages.len() == expected_n_pages);
209
210 let slots = smallvec![validity_to_child(&validity, len)];
211 let data = PcoData::new(chunk_metas, pages, dtype.as_ptype(), metadata, len);
212 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
213 }
214
215 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
216 SLOT_NAMES[idx].to_string()
217 }
218
219 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
220 let unsliced_validity = child_to_validity(
221 array.as_ref().slots()[0].as_ref(),
222 array.dtype().nullability(),
223 );
224 Ok(ExecutionResult::done(
225 array
226 .data()
227 .decompress(&unsliced_validity, ctx)?
228 .into_array(),
229 ))
230 }
231
232 fn reduce_parent(
233 array: ArrayView<'_, Self>,
234 parent: &ArrayRef,
235 child_idx: usize,
236 ) -> VortexResult<Option<ArrayRef>> {
237 crate::rules::RULES.evaluate(array, parent, child_idx)
238 }
239}
240
241pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
242 number_type_from_ptype(dtype.as_ptype())
243}
244
245pub(crate) fn number_type_from_ptype(ptype: PType) -> NumberType {
246 match ptype {
247 PType::F16 => NumberType::F16,
248 PType::F32 => NumberType::F32,
249 PType::F64 => NumberType::F64,
250 PType::I16 => NumberType::I16,
251 PType::I32 => NumberType::I32,
252 PType::I64 => NumberType::I64,
253 PType::U16 => NumberType::U16,
254 PType::U32 => NumberType::U32,
255 PType::U64 => NumberType::U64,
256 _ => unreachable!("PType not supported by Pco: {:?}", ptype),
257 }
258}
259
260fn collect_valid(
261 parray: ArrayView<'_, Primitive>,
262 ctx: &mut ExecutionCtx,
263) -> VortexResult<PrimitiveArray> {
264 let mask = parray
265 .array()
266 .validity()?
267 .execute_mask(parray.array().len(), ctx)?;
268 let result = parray
269 .array()
270 .filter(mask)?
271 .execute::<PrimitiveArray>(ctx)?;
272 Ok(result)
273}
274
275pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
276 use pco::errors::ErrorKind::*;
277 match err.kind {
278 Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
279 InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
280 other => vortex_err!("Pco {:?} error: {}", other, err.message),
281 }
282}
283
284#[derive(Clone, Debug)]
285pub struct Pco;
286
287impl Pco {
288 pub(crate) fn try_new(
289 dtype: DType,
290 data: PcoData,
291 validity: Validity,
292 ) -> VortexResult<PcoArray> {
293 let len = data.len();
294 data.validate(&dtype, len, &validity)?;
295 let slots = smallvec![validity_to_child(&validity, data.unsliced_n_rows())];
296 Ok(unsafe {
297 Array::from_parts_unchecked(ArrayParts::new(Pco, dtype, len, data).with_slots(slots))
298 })
299 }
300
301 pub fn from_primitive(
303 parray: ArrayView<'_, Primitive>,
304 level: usize,
305 values_per_page: usize,
306 ctx: &mut ExecutionCtx,
307 ) -> VortexResult<PcoArray> {
308 let dtype = parray.dtype().clone();
309 let validity = parray.validity()?;
310 let data = PcoData::from_primitive(parray, level, values_per_page, ctx)?;
311 Self::try_new(dtype, data, validity)
312 }
313}
314
315pub(super) const NUM_SLOTS: usize = 1;
317pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
318
319#[derive(Clone, Debug)]
320pub struct PcoData {
321 pub(crate) chunk_metas: Vec<ByteBuffer>,
322 pub(crate) pages: Vec<ByteBuffer>,
323 pub(crate) metadata: PcoMetadata,
324 ptype: PType,
325 unsliced_n_rows: usize,
326 slice_start: usize,
327 slice_stop: usize,
328}
329
330impl Display for PcoData {
331 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
332 write!(
333 f,
334 "ptype: {}, nrows: {}, slice: {}..{}",
335 self.ptype, self.unsliced_n_rows, self.slice_start, self.slice_stop
336 )
337 }
338}
339
340impl PcoData {
341 pub fn validate(&self, dtype: &DType, len: usize, validity: &Validity) -> VortexResult<()> {
342 let _ = number_type_from_ptype(self.ptype);
343 vortex_ensure!(
344 dtype.as_ptype() == self.ptype,
345 "expected ptype {}, got {}",
346 self.ptype,
347 dtype.as_ptype()
348 );
349 vortex_ensure!(
350 dtype.nullability() == validity.nullability(),
351 "expected nullability {}, got {}",
352 validity.nullability(),
353 dtype.nullability()
354 );
355 vortex_ensure!(
356 self.slice_start <= self.slice_stop && self.slice_stop <= self.unsliced_n_rows,
357 "invalid slice range {}..{} for {} rows",
358 self.slice_start,
359 self.slice_stop,
360 self.unsliced_n_rows
361 );
362 vortex_ensure!(
363 self.slice_stop - self.slice_start == len,
364 "expected len {len}, got {}",
365 self.slice_stop - self.slice_start
366 );
367 if let Some(validity_len) = validity.maybe_len() {
368 vortex_ensure!(
369 validity_len == self.unsliced_n_rows,
370 "expected validity len {}, got {}",
371 self.unsliced_n_rows,
372 validity_len
373 );
374 }
375 vortex_ensure!(
376 self.chunk_metas.len() == self.metadata.chunks.len(),
377 "expected {} chunk metas, got {}",
378 self.metadata.chunks.len(),
379 self.chunk_metas.len()
380 );
381 vortex_ensure!(
382 self.pages.len()
383 == self
384 .metadata
385 .chunks
386 .iter()
387 .map(|chunk| chunk.pages.len())
388 .sum::<usize>(),
389 "page count does not match metadata"
390 );
391 Ok(())
392 }
393
394 pub fn new(
395 chunk_metas: Vec<ByteBuffer>,
396 pages: Vec<ByteBuffer>,
397 ptype: PType,
398 metadata: PcoMetadata,
399 len: usize,
400 ) -> Self {
401 Self {
402 chunk_metas,
403 pages,
404 metadata,
405 ptype,
406 unsliced_n_rows: len,
407 slice_start: 0,
408 slice_stop: len,
409 }
410 }
411
412 pub fn from_primitive(
413 parray: ArrayView<'_, Primitive>,
414 level: usize,
415 values_per_page: usize,
416 ctx: &mut ExecutionCtx,
417 ) -> VortexResult<Self> {
418 Self::from_primitive_with_values_per_chunk(
419 parray,
420 level,
421 VALUES_PER_CHUNK,
422 values_per_page,
423 ctx,
424 )
425 }
426
427 pub(crate) fn from_primitive_with_values_per_chunk(
428 parray: ArrayView<'_, Primitive>,
429 level: usize,
430 values_per_chunk: usize,
431 values_per_page: usize,
432 ctx: &mut ExecutionCtx,
433 ) -> VortexResult<Self> {
434 let number_type = number_type_from_dtype(parray.dtype());
435 let values_per_page = if values_per_page == 0 {
436 values_per_chunk
437 } else {
438 values_per_page
439 };
440
441 let chunk_config = ChunkConfig::default()
443 .with_compression_level(level)
444 .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
445
446 let values = collect_valid(parray, ctx)?;
447 let n_values = values.len();
448
449 let fc = FileCompressor::default();
450 let mut header = vec![];
451 fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
452
453 let mut chunk_meta_buffers = vec![]; let mut chunk_infos = vec![]; let mut page_buffers = vec![];
456 for chunk_start in (0..n_values).step_by(values_per_chunk) {
457 let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
458 let mut cc = match_number_enum!(
459 number_type,
460 NumberType<T> => {
461 let values = values.to_buffer::<T>();
462 let chunk = &values.as_slice()[chunk_start..chunk_end];
463 fc
464 .chunk_compressor(chunk, &chunk_config)
465 .map_err(vortex_err_from_pco)?
466 }
467 );
468
469 let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
470 cc.write_meta(&mut chunk_meta_buffer)
471 .map_err(vortex_err_from_pco)?;
472 chunk_meta_buffers.push(chunk_meta_buffer.freeze());
473
474 let mut page_infos = vec![];
475 for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
476 let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
477 cc.write_page(page_idx, &mut page)
478 .map_err(vortex_err_from_pco)?;
479 page_buffers.push(page.freeze());
480 page_infos.push(PcoPageInfo {
481 n_values: u32::try_from(page_n_values)?,
482 });
483 }
484 chunk_infos.push(PcoChunkInfo { pages: page_infos })
485 }
486
487 let metadata = PcoMetadata {
488 header,
489 chunks: chunk_infos,
490 };
491 Ok(PcoData::new(
492 chunk_meta_buffers,
493 page_buffers,
494 parray.dtype().as_ptype(),
495 metadata,
496 parray.len(),
497 ))
498 }
499
500 pub fn from_array(
501 array: ArrayRef,
502 level: usize,
503 nums_per_page: usize,
504 ctx: &mut ExecutionCtx,
505 ) -> VortexResult<Self> {
506 let parray = array.try_downcast::<Primitive>().map_err(|a| {
507 vortex_err!(
508 "Pco can only encode primitive arrays, got {}",
509 a.encoding_id()
510 )
511 })?;
512 Self::from_primitive(parray.as_view(), level, nums_per_page, ctx)
513 }
514
515 pub fn decompress(
516 &self,
517 unsliced_validity: &Validity,
518 ctx: &mut ExecutionCtx,
519 ) -> VortexResult<PrimitiveArray> {
520 let number_type = number_type_from_ptype(self.ptype);
523 let values_byte_buffer = match_number_enum!(
524 number_type,
525 NumberType<T> => {
526 self.decompress_values_typed::<T>(unsliced_validity, ctx)?
527 }
528 );
529
530 Ok(PrimitiveArray::from_values_byte_buffer(
531 values_byte_buffer,
532 self.ptype,
533 unsliced_validity.slice(self.slice_start..self.slice_stop)?,
534 self.slice_stop - self.slice_start,
535 ))
536 }
537
538 fn decompress_values_typed<T: Number>(
539 &self,
540 unsliced_validity: &Validity,
541 ctx: &mut ExecutionCtx,
542 ) -> VortexResult<ByteBuffer> {
543 let slice_value_indices = unsliced_validity
545 .execute_mask(self.unsliced_n_rows, ctx)?
546 .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
547 let slice_value_start = slice_value_indices[0];
548 let slice_value_stop = slice_value_indices[1];
549 let slice_n_values = slice_value_stop - slice_value_start;
550
551 let (fd, _) =
554 FileDecompressor::new(self.metadata.header.as_slice()).map_err(vortex_err_from_pco)?;
555 let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
556 let mut page_idx = 0;
557 let mut page_value_start = 0;
558 let mut n_skipped_values = 0;
559 for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
560 let mut chunk_decompressor: Option<ChunkDecompressor<T>> = None;
562 for page_info in &chunk_info.pages {
563 let page_n_values = page_info.n_values as usize;
564 let page_value_stop = page_value_start + page_n_values;
565
566 if page_value_start >= slice_value_stop {
567 break;
568 }
569
570 if page_value_stop > slice_value_start {
571 let old_len = decompressed_values.len();
573 let new_len = old_len + page_n_values;
574 decompressed_values.reserve(page_n_values);
575 unsafe {
576 decompressed_values.set_len(new_len);
577 }
578 let page: &[u8] = self.pages[page_idx].as_ref();
579
580 let mut cd = match chunk_decompressor.take() {
581 Some(d) => d,
582 None => {
583 let (new_cd, _) = fd
584 .chunk_decompressor(chunk_meta.as_ref())
585 .map_err(vortex_err_from_pco)?;
586 new_cd
587 }
588 };
589
590 let mut pd = cd
591 .page_decompressor(page, page_n_values)
592 .map_err(vortex_err_from_pco)?;
593 pd.read(&mut decompressed_values[old_len..new_len])
594 .map_err(vortex_err_from_pco)?;
595
596 chunk_decompressor = Some(cd);
597 } else {
598 n_skipped_values += page_n_values;
599 }
600
601 page_value_start = page_value_stop;
602 page_idx += 1;
603 }
604 }
605
606 let value_offset = slice_value_start - n_skipped_values;
608 Ok(decompressed_values
609 .freeze()
610 .slice(value_offset..value_offset + slice_n_values)
611 .into_byte_buffer())
612 }
613
614 pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
615 PcoData {
616 slice_start: self.slice_start + start,
617 slice_stop: self.slice_start + stop,
618 ..self.clone()
619 }
620 }
621
622 pub fn len(&self) -> usize {
624 self.slice_stop - self.slice_start
625 }
626
627 pub fn is_empty(&self) -> bool {
629 self.slice_stop == self.slice_start
630 }
631
632 pub(crate) fn slice_start(&self) -> usize {
633 self.slice_start
634 }
635
636 pub(crate) fn slice_stop(&self) -> usize {
637 self.slice_stop
638 }
639
640 pub(crate) fn unsliced_n_rows(&self) -> usize {
641 self.unsliced_n_rows
642 }
643}
644
645impl ValidityVTable<Pco> for Pco {
646 fn validity(array: ArrayView<'_, Pco>) -> VortexResult<Validity> {
647 let unsliced_validity =
648 child_to_validity(array.slots()[0].as_ref(), array.dtype().nullability());
649 unsliced_validity.slice(array.slice_start()..array.slice_stop())
650 }
651}
652
653impl OperationsVTable<Pco> for Pco {
654 fn scalar_at(
655 array: ArrayView<'_, Pco>,
656 index: usize,
657 ctx: &mut ExecutionCtx,
658 ) -> VortexResult<Scalar> {
659 let unsliced_validity =
660 child_to_validity(array.slots()[0].as_ref(), array.dtype().nullability());
661 array
662 ._slice(index, index + 1)
663 .decompress(&unsliced_validity, ctx)?
664 .into_array()
665 .execute_scalar(0, ctx)
666 }
667}
668
669#[cfg(test)]
670mod tests {
671 use vortex_array::IntoArray;
672 use vortex_array::LEGACY_SESSION;
673 use vortex_array::VortexSessionExecute;
674 use vortex_array::arrays::PrimitiveArray;
675 use vortex_array::assert_arrays_eq;
676 use vortex_array::validity::Validity;
677 use vortex_buffer::buffer;
678
679 use crate::Pco;
680
681 #[test]
682 fn test_slice_nullable() {
683 let mut ctx = LEGACY_SESSION.create_execution_ctx();
684 let values = PrimitiveArray::new(
686 buffer![10u32, 20, 30, 40, 50, 60],
687 Validity::from_iter([false, true, true, true, true, false]),
688 );
689 let pco = Pco::from_primitive(values.as_view(), 0, 128, &mut ctx).unwrap();
690 assert_arrays_eq!(
691 pco,
692 PrimitiveArray::from_option_iter([
693 None,
694 Some(20u32),
695 Some(30),
696 Some(40),
697 Some(50),
698 None
699 ])
700 );
701
702 let sliced = pco.slice(1..5).unwrap();
704 let expected =
705 PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
706 .into_array();
707 assert_arrays_eq!(sliced, expected);
708 }
709}