1use std::cmp;
5use std::fmt::Debug;
6use std::fmt::Display;
7use std::fmt::Formatter;
8use std::hash::Hash;
9use std::hash::Hasher;
10
11use pco::ChunkConfig;
12use pco::PagingSpec;
13use pco::data_types::Number;
14use pco::data_types::NumberType;
15use pco::errors::PcoError;
16use pco::match_number_enum;
17use pco::wrapped::ChunkDecompressor;
18use pco::wrapped::FileCompressor;
19use pco::wrapped::FileDecompressor;
20use prost::Message;
21use vortex_array::Array;
22use vortex_array::ArrayEq;
23use vortex_array::ArrayHash;
24use vortex_array::ArrayId;
25use vortex_array::ArrayParts;
26use vortex_array::ArrayRef;
27use vortex_array::ArrayView;
28use vortex_array::ExecutionCtx;
29use vortex_array::ExecutionResult;
30use vortex_array::IntoArray;
31use vortex_array::LEGACY_SESSION;
32use vortex_array::Precision;
33use vortex_array::ToCanonical;
34use vortex_array::VortexSessionExecute;
35use vortex_array::arrays::Primitive;
36use vortex_array::arrays::PrimitiveArray;
37use vortex_array::buffer::BufferHandle;
38use vortex_array::dtype::DType;
39use vortex_array::dtype::PType;
40use vortex_array::dtype::half;
41use vortex_array::scalar::Scalar;
42use vortex_array::serde::ArrayChildren;
43use vortex_array::validity::Validity;
44use vortex_array::vtable::OperationsVTable;
45use vortex_array::vtable::VTable;
46use vortex_array::vtable::ValidityVTable;
47use vortex_array::vtable::child_to_validity;
48use vortex_array::vtable::validity_to_child;
49use vortex_buffer::BufferMut;
50use vortex_buffer::ByteBuffer;
51use vortex_buffer::ByteBufferMut;
52use vortex_error::VortexError;
53use vortex_error::VortexResult;
54use vortex_error::vortex_bail;
55use vortex_error::vortex_ensure;
56use vortex_error::vortex_err;
57use vortex_session::VortexSession;
58use vortex_session::registry::CachedId;
59
60use crate::PcoChunkInfo;
61use crate::PcoMetadata;
62use crate::PcoPageInfo;
63
64const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
83
84pub type PcoArray = Array<Pco>;
86
87impl ArrayHash for PcoData {
88 fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
89 self.unsliced_n_rows.hash(state);
90 self.slice_start.hash(state);
91 self.slice_stop.hash(state);
92 for chunk_meta in &self.chunk_metas {
94 chunk_meta.array_hash(state, precision);
95 }
96 for page in &self.pages {
97 page.array_hash(state, precision);
98 }
99 }
100}
101
102impl ArrayEq for PcoData {
103 fn array_eq(&self, other: &Self, precision: Precision) -> bool {
104 if self.unsliced_n_rows != other.unsliced_n_rows
105 || self.slice_start != other.slice_start
106 || self.slice_stop != other.slice_stop
107 || self.chunk_metas.len() != other.chunk_metas.len()
108 || self.pages.len() != other.pages.len()
109 {
110 return false;
111 }
112 for (a, b) in self.chunk_metas.iter().zip(&other.chunk_metas) {
113 if !a.array_eq(b, precision) {
114 return false;
115 }
116 }
117 for (a, b) in self.pages.iter().zip(&other.pages) {
118 if !a.array_eq(b, precision) {
119 return false;
120 }
121 }
122 true
123 }
124}
125
126impl VTable for Pco {
127 type ArrayData = PcoData;
128
129 type OperationsVTable = Self;
130 type ValidityVTable = Self;
131
132 fn id(&self) -> ArrayId {
133 static ID: CachedId = CachedId::new("vortex.pco");
134 *ID
135 }
136
137 fn validate(
138 &self,
139 data: &PcoData,
140 dtype: &DType,
141 len: usize,
142 slots: &[Option<ArrayRef>],
143 ) -> VortexResult<()> {
144 let validity = child_to_validity(&slots[0], dtype.nullability());
145 data.validate(dtype, len, &validity)
146 }
147
148 fn nbuffers(array: ArrayView<'_, Self>) -> usize {
149 array.chunk_metas.len() + array.pages.len()
150 }
151
152 fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
153 if idx < array.chunk_metas.len() {
154 BufferHandle::new_host(array.chunk_metas[idx].clone())
155 } else {
156 let page_idx = idx - array.chunk_metas.len();
157 BufferHandle::new_host(array.pages[page_idx].clone())
158 }
159 }
160
161 fn buffer_name(array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
162 if idx < array.chunk_metas.len() {
163 Some(format!("chunk_meta_{idx}"))
164 } else {
165 Some(format!("page_{}", idx - array.chunk_metas.len()))
166 }
167 }
168
169 fn serialize(
170 array: ArrayView<'_, Self>,
171 _session: &VortexSession,
172 ) -> VortexResult<Option<Vec<u8>>> {
173 Ok(Some(array.metadata.clone().encode_to_vec()))
174 }
175
176 fn deserialize(
177 &self,
178 dtype: &DType,
179 len: usize,
180 metadata: &[u8],
181 buffers: &[BufferHandle],
182 children: &dyn ArrayChildren,
183 _session: &VortexSession,
184 ) -> VortexResult<ArrayParts<Self>> {
185 let metadata = PcoMetadata::decode(metadata)?;
186 let validity = if children.is_empty() {
187 Validity::from(dtype.nullability())
188 } else if children.len() == 1 {
189 let validity = children.get(0, &Validity::DTYPE, len)?;
190 Validity::Array(validity)
191 } else {
192 vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
193 };
194
195 vortex_ensure!(buffers.len() >= metadata.chunks.len());
196 let chunk_metas = buffers[..metadata.chunks.len()]
197 .iter()
198 .map(|b| b.clone().try_to_host_sync())
199 .collect::<VortexResult<Vec<_>>>()?;
200 let pages = buffers[metadata.chunks.len()..]
201 .iter()
202 .map(|b| b.clone().try_to_host_sync())
203 .collect::<VortexResult<Vec<_>>>()?;
204
205 let expected_n_pages = metadata
206 .chunks
207 .iter()
208 .map(|info| info.pages.len())
209 .sum::<usize>();
210 vortex_ensure!(pages.len() == expected_n_pages);
211
212 let slots = vec![validity_to_child(&validity, len)];
213 let data = PcoData::new(chunk_metas, pages, dtype.as_ptype(), metadata, len);
214 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
215 }
216
217 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
218 SLOT_NAMES[idx].to_string()
219 }
220
221 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
222 let unsliced_validity =
223 child_to_validity(&array.as_ref().slots()[0], array.dtype().nullability());
224 Ok(ExecutionResult::done(
225 array
226 .data()
227 .decompress(&unsliced_validity, ctx)?
228 .into_array(),
229 ))
230 }
231
232 fn reduce_parent(
233 array: ArrayView<'_, Self>,
234 parent: &ArrayRef,
235 child_idx: usize,
236 ) -> VortexResult<Option<ArrayRef>> {
237 crate::rules::RULES.evaluate(array, parent, child_idx)
238 }
239}
240
241pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
242 number_type_from_ptype(dtype.as_ptype())
243}
244
245pub(crate) fn number_type_from_ptype(ptype: PType) -> NumberType {
246 match ptype {
247 PType::F16 => NumberType::F16,
248 PType::F32 => NumberType::F32,
249 PType::F64 => NumberType::F64,
250 PType::I16 => NumberType::I16,
251 PType::I32 => NumberType::I32,
252 PType::I64 => NumberType::I64,
253 PType::U16 => NumberType::U16,
254 PType::U32 => NumberType::U32,
255 PType::U64 => NumberType::U64,
256 _ => unreachable!("PType not supported by Pco: {:?}", ptype),
257 }
258}
259
260fn collect_valid(parray: ArrayView<'_, Primitive>) -> VortexResult<PrimitiveArray> {
261 let mask = parray.array().validity()?.to_mask(
262 parray.array().len(),
263 &mut LEGACY_SESSION.create_execution_ctx(),
264 )?;
265 Ok(parray.array().filter(mask)?.to_primitive())
266}
267
268pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
269 use pco::errors::ErrorKind::*;
270 match err.kind {
271 Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
272 InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
273 other => vortex_err!("Pco {:?} error: {}", other, err.message),
274 }
275}
276
277#[derive(Clone, Debug)]
278pub struct Pco;
279
280impl Pco {
281 pub(crate) fn try_new(
282 dtype: DType,
283 data: PcoData,
284 validity: Validity,
285 ) -> VortexResult<PcoArray> {
286 let len = data.len();
287 data.validate(&dtype, len, &validity)?;
288 let slots = vec![validity_to_child(&validity, data.unsliced_n_rows())];
289 Ok(unsafe {
290 Array::from_parts_unchecked(ArrayParts::new(Pco, dtype, len, data).with_slots(slots))
291 })
292 }
293
294 pub fn from_primitive(
296 parray: ArrayView<'_, Primitive>,
297 level: usize,
298 values_per_page: usize,
299 ) -> VortexResult<PcoArray> {
300 let dtype = parray.dtype().clone();
301 let validity = parray.validity()?;
302 let data = PcoData::from_primitive(parray, level, values_per_page)?;
303 Self::try_new(dtype, data, validity)
304 }
305}
306
307pub(super) const NUM_SLOTS: usize = 1;
309pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["validity"];
310
311#[derive(Clone, Debug)]
312pub struct PcoData {
313 pub(crate) chunk_metas: Vec<ByteBuffer>,
314 pub(crate) pages: Vec<ByteBuffer>,
315 pub(crate) metadata: PcoMetadata,
316 ptype: PType,
317 unsliced_n_rows: usize,
318 slice_start: usize,
319 slice_stop: usize,
320}
321
322impl Display for PcoData {
323 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
324 write!(
325 f,
326 "ptype: {}, nrows: {}, slice: {}..{}",
327 self.ptype, self.unsliced_n_rows, self.slice_start, self.slice_stop
328 )
329 }
330}
331
332impl PcoData {
333 pub fn validate(&self, dtype: &DType, len: usize, validity: &Validity) -> VortexResult<()> {
334 let _ = number_type_from_ptype(self.ptype);
335 vortex_ensure!(
336 dtype.as_ptype() == self.ptype,
337 "expected ptype {}, got {}",
338 self.ptype,
339 dtype.as_ptype()
340 );
341 vortex_ensure!(
342 dtype.nullability() == validity.nullability(),
343 "expected nullability {}, got {}",
344 validity.nullability(),
345 dtype.nullability()
346 );
347 vortex_ensure!(
348 self.slice_start <= self.slice_stop && self.slice_stop <= self.unsliced_n_rows,
349 "invalid slice range {}..{} for {} rows",
350 self.slice_start,
351 self.slice_stop,
352 self.unsliced_n_rows
353 );
354 vortex_ensure!(
355 self.slice_stop - self.slice_start == len,
356 "expected len {len}, got {}",
357 self.slice_stop - self.slice_start
358 );
359 if let Some(validity_len) = validity.maybe_len() {
360 vortex_ensure!(
361 validity_len == self.unsliced_n_rows,
362 "expected validity len {}, got {}",
363 self.unsliced_n_rows,
364 validity_len
365 );
366 }
367 vortex_ensure!(
368 self.chunk_metas.len() == self.metadata.chunks.len(),
369 "expected {} chunk metas, got {}",
370 self.metadata.chunks.len(),
371 self.chunk_metas.len()
372 );
373 vortex_ensure!(
374 self.pages.len()
375 == self
376 .metadata
377 .chunks
378 .iter()
379 .map(|chunk| chunk.pages.len())
380 .sum::<usize>(),
381 "page count does not match metadata"
382 );
383 Ok(())
384 }
385
386 pub fn new(
387 chunk_metas: Vec<ByteBuffer>,
388 pages: Vec<ByteBuffer>,
389 ptype: PType,
390 metadata: PcoMetadata,
391 len: usize,
392 ) -> Self {
393 Self {
394 chunk_metas,
395 pages,
396 metadata,
397 ptype,
398 unsliced_n_rows: len,
399 slice_start: 0,
400 slice_stop: len,
401 }
402 }
403
404 pub fn from_primitive(
405 parray: ArrayView<'_, Primitive>,
406 level: usize,
407 values_per_page: usize,
408 ) -> VortexResult<Self> {
409 Self::from_primitive_with_values_per_chunk(parray, level, VALUES_PER_CHUNK, values_per_page)
410 }
411
412 pub(crate) fn from_primitive_with_values_per_chunk(
413 parray: ArrayView<'_, Primitive>,
414 level: usize,
415 values_per_chunk: usize,
416 values_per_page: usize,
417 ) -> VortexResult<Self> {
418 let number_type = number_type_from_dtype(parray.dtype());
419 let values_per_page = if values_per_page == 0 {
420 values_per_chunk
421 } else {
422 values_per_page
423 };
424
425 let chunk_config = ChunkConfig::default()
427 .with_compression_level(level)
428 .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
429
430 let values = collect_valid(parray)?;
431 let n_values = values.len();
432
433 let fc = FileCompressor::default();
434 let mut header = vec![];
435 fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
436
437 let mut chunk_meta_buffers = vec![]; let mut chunk_infos = vec![]; let mut page_buffers = vec![];
440 for chunk_start in (0..n_values).step_by(values_per_chunk) {
441 let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
442 let mut cc = match_number_enum!(
443 number_type,
444 NumberType<T> => {
445 let values = values.to_buffer::<T>();
446 let chunk = &values.as_slice()[chunk_start..chunk_end];
447 fc
448 .chunk_compressor(chunk, &chunk_config)
449 .map_err(vortex_err_from_pco)?
450 }
451 );
452
453 let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
454 cc.write_meta(&mut chunk_meta_buffer)
455 .map_err(vortex_err_from_pco)?;
456 chunk_meta_buffers.push(chunk_meta_buffer.freeze());
457
458 let mut page_infos = vec![];
459 for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
460 let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
461 cc.write_page(page_idx, &mut page)
462 .map_err(vortex_err_from_pco)?;
463 page_buffers.push(page.freeze());
464 page_infos.push(PcoPageInfo {
465 n_values: u32::try_from(page_n_values)?,
466 });
467 }
468 chunk_infos.push(PcoChunkInfo { pages: page_infos })
469 }
470
471 let metadata = PcoMetadata {
472 header,
473 chunks: chunk_infos,
474 };
475 Ok(PcoData::new(
476 chunk_meta_buffers,
477 page_buffers,
478 parray.dtype().as_ptype(),
479 metadata,
480 parray.len(),
481 ))
482 }
483
484 pub fn from_array(array: ArrayRef, level: usize, nums_per_page: usize) -> VortexResult<Self> {
485 let parray = array.try_downcast::<Primitive>().map_err(|a| {
486 vortex_err!(
487 "Pco can only encode primitive arrays, got {}",
488 a.encoding_id()
489 )
490 })?;
491 Self::from_primitive(parray.as_view(), level, nums_per_page)
492 }
493
494 pub fn decompress(
495 &self,
496 unsliced_validity: &Validity,
497 ctx: &mut ExecutionCtx,
498 ) -> VortexResult<PrimitiveArray> {
499 let number_type = number_type_from_ptype(self.ptype);
502 let values_byte_buffer = match_number_enum!(
503 number_type,
504 NumberType<T> => {
505 self.decompress_values_typed::<T>(unsliced_validity, ctx)?
506 }
507 );
508
509 Ok(PrimitiveArray::from_values_byte_buffer(
510 values_byte_buffer,
511 self.ptype,
512 unsliced_validity.slice(self.slice_start..self.slice_stop)?,
513 self.slice_stop - self.slice_start,
514 ))
515 }
516
517 fn decompress_values_typed<T: Number>(
518 &self,
519 unsliced_validity: &Validity,
520 ctx: &mut ExecutionCtx,
521 ) -> VortexResult<ByteBuffer> {
522 let slice_value_indices = unsliced_validity
524 .execute_mask(self.unsliced_n_rows, ctx)?
525 .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
526 let slice_value_start = slice_value_indices[0];
527 let slice_value_stop = slice_value_indices[1];
528 let slice_n_values = slice_value_stop - slice_value_start;
529
530 let (fd, _) =
533 FileDecompressor::new(self.metadata.header.as_slice()).map_err(vortex_err_from_pco)?;
534 let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
535 let mut page_idx = 0;
536 let mut page_value_start = 0;
537 let mut n_skipped_values = 0;
538 for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
539 let mut chunk_decompressor: Option<ChunkDecompressor<T>> = None;
541 for page_info in &chunk_info.pages {
542 let page_n_values = page_info.n_values as usize;
543 let page_value_stop = page_value_start + page_n_values;
544
545 if page_value_start >= slice_value_stop {
546 break;
547 }
548
549 if page_value_stop > slice_value_start {
550 let old_len = decompressed_values.len();
552 let new_len = old_len + page_n_values;
553 decompressed_values.reserve(page_n_values);
554 unsafe {
555 decompressed_values.set_len(new_len);
556 }
557 let page: &[u8] = self.pages[page_idx].as_ref();
558
559 let mut cd = match chunk_decompressor.take() {
560 Some(d) => d,
561 None => {
562 let (new_cd, _) = fd
563 .chunk_decompressor(chunk_meta.as_ref())
564 .map_err(vortex_err_from_pco)?;
565 new_cd
566 }
567 };
568
569 let mut pd = cd
570 .page_decompressor(page, page_n_values)
571 .map_err(vortex_err_from_pco)?;
572 pd.read(&mut decompressed_values[old_len..new_len])
573 .map_err(vortex_err_from_pco)?;
574
575 chunk_decompressor = Some(cd);
576 } else {
577 n_skipped_values += page_n_values;
578 }
579
580 page_value_start = page_value_stop;
581 page_idx += 1;
582 }
583 }
584
585 let value_offset = slice_value_start - n_skipped_values;
587 Ok(decompressed_values
588 .freeze()
589 .slice(value_offset..value_offset + slice_n_values)
590 .into_byte_buffer())
591 }
592
593 pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
594 PcoData {
595 slice_start: self.slice_start + start,
596 slice_stop: self.slice_start + stop,
597 ..self.clone()
598 }
599 }
600
601 pub fn len(&self) -> usize {
603 self.slice_stop - self.slice_start
604 }
605
606 pub fn is_empty(&self) -> bool {
608 self.slice_stop == self.slice_start
609 }
610
611 pub(crate) fn slice_start(&self) -> usize {
612 self.slice_start
613 }
614
615 pub(crate) fn slice_stop(&self) -> usize {
616 self.slice_stop
617 }
618
619 pub(crate) fn unsliced_n_rows(&self) -> usize {
620 self.unsliced_n_rows
621 }
622}
623
624impl ValidityVTable<Pco> for Pco {
625 fn validity(array: ArrayView<'_, Pco>) -> VortexResult<Validity> {
626 let unsliced_validity = child_to_validity(&array.slots()[0], array.dtype().nullability());
627 unsliced_validity.slice(array.slice_start()..array.slice_stop())
628 }
629}
630
631impl OperationsVTable<Pco> for Pco {
632 fn scalar_at(
633 array: ArrayView<'_, Pco>,
634 index: usize,
635 ctx: &mut ExecutionCtx,
636 ) -> VortexResult<Scalar> {
637 let unsliced_validity = child_to_validity(&array.slots()[0], array.dtype().nullability());
638 array
639 ._slice(index, index + 1)
640 .decompress(&unsliced_validity, ctx)?
641 .into_array()
642 .execute_scalar(0, ctx)
643 }
644}
645
646#[cfg(test)]
647mod tests {
648 use vortex_array::IntoArray;
649 use vortex_array::arrays::PrimitiveArray;
650 use vortex_array::assert_arrays_eq;
651 use vortex_array::validity::Validity;
652 use vortex_buffer::buffer;
653
654 use crate::Pco;
655
656 #[test]
657 fn test_slice_nullable() {
658 let values = PrimitiveArray::new(
660 buffer![10u32, 20, 30, 40, 50, 60],
661 Validity::from_iter([false, true, true, true, true, false]),
662 );
663 let pco = Pco::from_primitive(values.as_view(), 0, 128).unwrap();
664 assert_arrays_eq!(
665 pco,
666 PrimitiveArray::from_option_iter([
667 None,
668 Some(20u32),
669 Some(30),
670 Some(40),
671 Some(50),
672 None
673 ])
674 );
675
676 let sliced = pco.slice(1..5).unwrap();
678 let expected =
679 PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
680 .into_array();
681 assert_arrays_eq!(sliced, expected);
682 }
683}