1use std::cmp;
5use std::fmt::Debug;
6use std::hash::Hash;
7
8use pco::ChunkConfig;
9use pco::PagingSpec;
10use pco::data_types::Number;
11use pco::data_types::NumberType;
12use pco::errors::PcoError;
13use pco::match_number_enum;
14use pco::wrapped::ChunkDecompressor;
15use pco::wrapped::FileCompressor;
16use pco::wrapped::FileDecompressor;
17use prost::Message;
18use vortex_array::ArrayBufferVisitor;
19use vortex_array::ArrayChildVisitor;
20use vortex_array::ArrayEq;
21use vortex_array::ArrayHash;
22use vortex_array::ArrayRef;
23use vortex_array::ExecutionCtx;
24use vortex_array::IntoArray;
25use vortex_array::Precision;
26use vortex_array::ProstMetadata;
27use vortex_array::ToCanonical;
28use vortex_array::arrays::PrimitiveArray;
29use vortex_array::arrays::PrimitiveVTable;
30use vortex_array::buffer::BufferHandle;
31use vortex_array::compute::filter;
32use vortex_array::scalar::Scalar;
33use vortex_array::serde::ArrayChildren;
34use vortex_array::stats::ArrayStats;
35use vortex_array::stats::StatsSetRef;
36use vortex_array::validity::Validity;
37use vortex_array::vtable;
38use vortex_array::vtable::ArrayId;
39use vortex_array::vtable::BaseArrayVTable;
40use vortex_array::vtable::OperationsVTable;
41use vortex_array::vtable::VTable;
42use vortex_array::vtable::ValidityHelper;
43use vortex_array::vtable::ValiditySliceHelper;
44use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
45use vortex_array::vtable::VisitorVTable;
46use vortex_array::vtable::validity_nchildren;
47use vortex_buffer::BufferMut;
48use vortex_buffer::ByteBuffer;
49use vortex_buffer::ByteBufferMut;
50use vortex_dtype::DType;
51use vortex_dtype::PType;
52use vortex_dtype::half;
53use vortex_error::VortexError;
54use vortex_error::VortexExpect as _;
55use vortex_error::VortexResult;
56use vortex_error::vortex_bail;
57use vortex_error::vortex_ensure;
58use vortex_error::vortex_err;
59use vortex_session::VortexSession;
60
61use crate::PcoChunkInfo;
62use crate::PcoMetadata;
63use crate::PcoPageInfo;
64
65const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
84
85vtable!(Pco);
86
87impl VTable for PcoVTable {
88 type Array = PcoArray;
89
90 type Metadata = ProstMetadata<PcoMetadata>;
91
92 type ArrayVTable = Self;
93 type OperationsVTable = Self;
94 type ValidityVTable = ValidityVTableFromValiditySliceHelper;
95 type VisitorVTable = Self;
96
97 fn id(_array: &Self::Array) -> ArrayId {
98 Self::ID
99 }
100
101 fn metadata(array: &PcoArray) -> VortexResult<Self::Metadata> {
102 Ok(ProstMetadata(array.metadata.clone()))
103 }
104
105 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
106 Ok(Some(metadata.0.encode_to_vec()))
107 }
108
109 fn deserialize(
110 bytes: &[u8],
111 _dtype: &DType,
112 _len: usize,
113 _buffers: &[BufferHandle],
114 _session: &VortexSession,
115 ) -> VortexResult<Self::Metadata> {
116 Ok(ProstMetadata(PcoMetadata::decode(bytes)?))
117 }
118
119 fn build(
120 dtype: &DType,
121 len: usize,
122 metadata: &Self::Metadata,
123 buffers: &[BufferHandle],
124 children: &dyn ArrayChildren,
125 ) -> VortexResult<PcoArray> {
126 let validity = if children.is_empty() {
127 Validity::from(dtype.nullability())
128 } else if children.len() == 1 {
129 let validity = children.get(0, &Validity::DTYPE, len)?;
130 Validity::Array(validity)
131 } else {
132 vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
133 };
134
135 vortex_ensure!(buffers.len() >= metadata.0.chunks.len());
136 let chunk_metas = buffers[..metadata.0.chunks.len()]
137 .iter()
138 .map(|b| b.clone().try_to_host_sync())
139 .collect::<VortexResult<Vec<_>>>()?;
140 let pages = buffers[metadata.0.chunks.len()..]
141 .iter()
142 .map(|b| b.clone().try_to_host_sync())
143 .collect::<VortexResult<Vec<_>>>()?;
144
145 let expected_n_pages = metadata
146 .0
147 .chunks
148 .iter()
149 .map(|info| info.pages.len())
150 .sum::<usize>();
151 vortex_ensure!(pages.len() == expected_n_pages);
152
153 Ok(PcoArray::new(
154 chunk_metas,
155 pages,
156 dtype.clone(),
157 metadata.0.clone(),
158 len,
159 validity,
160 ))
161 }
162
163 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
164 vortex_ensure!(
165 children.len() <= 1,
166 "PcoArray expects 0 or 1 children, got {}",
167 children.len()
168 );
169
170 if children.is_empty() {
171 array.unsliced_validity = Validity::from(array.dtype.nullability());
172 } else {
173 array.unsliced_validity =
174 Validity::Array(children.into_iter().next().vortex_expect("validity child"));
175 }
176
177 Ok(())
178 }
179
180 fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
181 Ok(array.decompress()?.into_array())
182 }
183
184 fn reduce_parent(
185 array: &Self::Array,
186 parent: &ArrayRef,
187 child_idx: usize,
188 ) -> VortexResult<Option<ArrayRef>> {
189 crate::rules::RULES.evaluate(array, parent, child_idx)
190 }
191}
192
193pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
194 let ptype = dtype.as_ptype();
195 match ptype {
196 PType::F16 => NumberType::F16,
197 PType::F32 => NumberType::F32,
198 PType::F64 => NumberType::F64,
199 PType::I16 => NumberType::I16,
200 PType::I32 => NumberType::I32,
201 PType::I64 => NumberType::I64,
202 PType::U16 => NumberType::U16,
203 PType::U32 => NumberType::U32,
204 PType::U64 => NumberType::U64,
205 _ => unreachable!("PType not supported by Pco: {:?}", ptype),
206 }
207}
208
209fn collect_valid(parray: &PrimitiveArray) -> VortexResult<PrimitiveArray> {
210 let mask = parray.validity_mask()?;
211 Ok(filter(&parray.to_array(), &mask)?.to_primitive())
212}
213
214pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
215 use pco::errors::ErrorKind::*;
216 match err.kind {
217 Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
218 InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
219 other => vortex_err!("Pco {:?} error: {}", other, err.message),
220 }
221}
222
223#[derive(Debug)]
224pub struct PcoVTable;
225
226impl PcoVTable {
227 pub const ID: ArrayId = ArrayId::new_ref("vortex.pco");
228}
229
230#[derive(Clone, Debug)]
231pub struct PcoArray {
232 pub(crate) chunk_metas: Vec<ByteBuffer>,
233 pub(crate) pages: Vec<ByteBuffer>,
234 pub(crate) metadata: PcoMetadata,
235 dtype: DType,
236 pub(crate) unsliced_validity: Validity,
237 unsliced_n_rows: usize,
238 stats_set: ArrayStats,
239 slice_start: usize,
240 slice_stop: usize,
241}
242
243impl PcoArray {
244 pub fn new(
245 chunk_metas: Vec<ByteBuffer>,
246 pages: Vec<ByteBuffer>,
247 dtype: DType,
248 metadata: PcoMetadata,
249 len: usize,
250 validity: Validity,
251 ) -> Self {
252 Self {
253 chunk_metas,
254 pages,
255 metadata,
256 dtype,
257 unsliced_validity: validity,
258 unsliced_n_rows: len,
259 stats_set: Default::default(),
260 slice_start: 0,
261 slice_stop: len,
262 }
263 }
264
265 pub fn from_primitive(
266 parray: &PrimitiveArray,
267 level: usize,
268 values_per_page: usize,
269 ) -> VortexResult<Self> {
270 Self::from_primitive_with_values_per_chunk(parray, level, VALUES_PER_CHUNK, values_per_page)
271 }
272
273 pub(crate) fn from_primitive_with_values_per_chunk(
274 parray: &PrimitiveArray,
275 level: usize,
276 values_per_chunk: usize,
277 values_per_page: usize,
278 ) -> VortexResult<Self> {
279 let number_type = number_type_from_dtype(parray.dtype());
280 let values_per_page = if values_per_page == 0 {
281 values_per_chunk
282 } else {
283 values_per_page
284 };
285
286 let chunk_config = ChunkConfig::default()
288 .with_compression_level(level)
289 .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
290
291 let values = collect_valid(parray)?;
292 let n_values = values.len();
293
294 let fc = FileCompressor::default();
295 let mut header = vec![];
296 fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
297
298 let mut chunk_meta_buffers = vec![]; let mut chunk_infos = vec![]; let mut page_buffers = vec![];
301 for chunk_start in (0..n_values).step_by(values_per_chunk) {
302 let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
303 let mut cc = match_number_enum!(
304 number_type,
305 NumberType<T> => {
306 let values = values.to_buffer::<T>();
307 let chunk = &values.as_slice()[chunk_start..chunk_end];
308 fc
309 .chunk_compressor(chunk, &chunk_config)
310 .map_err(vortex_err_from_pco)?
311 }
312 );
313
314 let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
315 cc.write_meta(&mut chunk_meta_buffer)
316 .map_err(vortex_err_from_pco)?;
317 chunk_meta_buffers.push(chunk_meta_buffer.freeze());
318
319 let mut page_infos = vec![];
320 for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
321 let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
322 cc.write_page(page_idx, &mut page)
323 .map_err(vortex_err_from_pco)?;
324 page_buffers.push(page.freeze());
325 page_infos.push(PcoPageInfo {
326 n_values: u32::try_from(page_n_values)?,
327 });
328 }
329 chunk_infos.push(PcoChunkInfo { pages: page_infos })
330 }
331
332 let metadata = PcoMetadata {
333 header,
334 chunks: chunk_infos,
335 };
336 Ok(PcoArray::new(
337 chunk_meta_buffers,
338 page_buffers,
339 parray.dtype().clone(),
340 metadata,
341 parray.len(),
342 parray.validity().clone(),
343 ))
344 }
345
346 pub fn from_array(array: ArrayRef, level: usize, nums_per_page: usize) -> VortexResult<Self> {
347 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
348 Self::from_primitive(parray, level, nums_per_page)
349 } else {
350 Err(vortex_err!("Pco can only encode primitive arrays"))
351 }
352 }
353
354 pub fn decompress(&self) -> VortexResult<PrimitiveArray> {
355 let number_type = number_type_from_dtype(&self.dtype);
358 let values_byte_buffer = match_number_enum!(
359 number_type,
360 NumberType<T> => {
361 self.decompress_values_typed::<T>()
362 }
363 );
364
365 Ok(PrimitiveArray::from_values_byte_buffer(
366 values_byte_buffer,
367 self.dtype.as_ptype(),
368 self.unsliced_validity
369 .slice(self.slice_start..self.slice_stop)?,
370 self.slice_stop - self.slice_start,
371 ))
372 }
373
374 #[allow(clippy::unwrap_in_result, clippy::unwrap_used)]
375 fn decompress_values_typed<T: Number>(&self) -> ByteBuffer {
376 let slice_value_indices = self
378 .unsliced_validity
379 .to_mask(self.unsliced_n_rows)
380 .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
381 let slice_value_start = slice_value_indices[0];
382 let slice_value_stop = slice_value_indices[1];
383 let slice_n_values = slice_value_stop - slice_value_start;
384
385 let (fd, _) = FileDecompressor::new(self.metadata.header.as_slice())
388 .map_err(vortex_err_from_pco)
389 .vortex_expect("FileDecompressor::new should succeed with valid header");
390 let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
391 let mut page_idx = 0;
392 let mut page_value_start = 0;
393 let mut n_skipped_values = 0;
394 for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
395 let mut cd: Option<ChunkDecompressor<T>> = None;
396 for page_info in &chunk_info.pages {
397 let page_n_values = page_info.n_values as usize;
398 let page_value_stop = page_value_start + page_n_values;
399
400 if page_value_start >= slice_value_stop {
401 break;
402 }
403
404 if page_value_stop > slice_value_start {
405 let old_len = decompressed_values.len();
407 let new_len = old_len + page_n_values;
408 decompressed_values.reserve(page_n_values);
409 unsafe {
410 decompressed_values.set_len(new_len);
411 }
412 let chunk_meta_bytes: &[u8] = chunk_meta.as_ref();
413 let page: &[u8] = self.pages[page_idx].as_ref();
414 if cd.is_none() {
415 let (new_cd, _) = fd
416 .chunk_decompressor(chunk_meta_bytes)
417 .map_err(vortex_err_from_pco)
418 .vortex_expect(
419 "chunk_decompressor should succeed with valid chunk metadata",
420 );
421 cd = Some(new_cd);
422 }
423 let mut pd = cd
424 .as_mut()
425 .unwrap()
426 .page_decompressor(page, page_n_values)
427 .map_err(vortex_err_from_pco)
428 .vortex_expect("page_decompressor should succeed with valid page data");
429 pd.read(&mut decompressed_values[old_len..new_len])
430 .map_err(vortex_err_from_pco)
431 .vortex_expect("decompress should succeed with valid compressed data");
432 } else {
433 n_skipped_values += page_n_values;
434 }
435
436 page_value_start = page_value_stop;
437 page_idx += 1;
438 }
439 }
440
441 let value_offset = slice_value_start - n_skipped_values;
443 decompressed_values
444 .freeze()
445 .slice(value_offset..value_offset + slice_n_values)
446 .into_byte_buffer()
447 }
448
449 pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
450 PcoArray {
451 slice_start: self.slice_start + start,
452 slice_stop: self.slice_start + stop,
453 stats_set: Default::default(),
454 ..self.clone()
455 }
456 }
457
458 pub(crate) fn dtype(&self) -> &DType {
459 &self.dtype
460 }
461
462 pub(crate) fn slice_start(&self) -> usize {
463 self.slice_start
464 }
465
466 pub(crate) fn slice_stop(&self) -> usize {
467 self.slice_stop
468 }
469
470 pub(crate) fn unsliced_n_rows(&self) -> usize {
471 self.unsliced_n_rows
472 }
473}
474
475impl ValiditySliceHelper for PcoArray {
476 fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
477 (&self.unsliced_validity, self.slice_start, self.slice_stop)
478 }
479}
480
481impl BaseArrayVTable<PcoVTable> for PcoVTable {
482 fn len(array: &PcoArray) -> usize {
483 array.slice_stop - array.slice_start
484 }
485
486 fn dtype(array: &PcoArray) -> &DType {
487 &array.dtype
488 }
489
490 fn stats(array: &PcoArray) -> StatsSetRef<'_> {
491 array.stats_set.to_ref(array.as_ref())
492 }
493
494 fn array_hash<H: std::hash::Hasher>(array: &PcoArray, state: &mut H, precision: Precision) {
495 array.dtype.hash(state);
496 array.unsliced_validity.array_hash(state, precision);
497 array.unsliced_n_rows.hash(state);
498 array.slice_start.hash(state);
499 array.slice_stop.hash(state);
500 for chunk_meta in &array.chunk_metas {
502 chunk_meta.array_hash(state, precision);
503 }
504 for page in &array.pages {
505 page.array_hash(state, precision);
506 }
507 }
508
509 fn array_eq(array: &PcoArray, other: &PcoArray, precision: Precision) -> bool {
510 if array.dtype != other.dtype
511 || !array
512 .unsliced_validity
513 .array_eq(&other.unsliced_validity, precision)
514 || array.unsliced_n_rows != other.unsliced_n_rows
515 || array.slice_start != other.slice_start
516 || array.slice_stop != other.slice_stop
517 || array.chunk_metas.len() != other.chunk_metas.len()
518 || array.pages.len() != other.pages.len()
519 {
520 return false;
521 }
522 for (a, b) in array.chunk_metas.iter().zip(&other.chunk_metas) {
523 if !a.array_eq(b, precision) {
524 return false;
525 }
526 }
527 for (a, b) in array.pages.iter().zip(&other.pages) {
528 if !a.array_eq(b, precision) {
529 return false;
530 }
531 }
532 true
533 }
534}
535
536impl OperationsVTable<PcoVTable> for PcoVTable {
537 fn scalar_at(array: &PcoArray, index: usize) -> VortexResult<Scalar> {
538 array._slice(index, index + 1).decompress()?.scalar_at(0)
539 }
540}
541
542impl VisitorVTable<PcoVTable> for PcoVTable {
543 fn visit_buffers(array: &PcoArray, visitor: &mut dyn ArrayBufferVisitor) {
544 for (i, buffer) in array.chunk_metas.iter().enumerate() {
545 visitor.visit_buffer_handle(
546 &format!("chunk_meta_{i}"),
547 &BufferHandle::new_host(buffer.clone()),
548 );
549 }
550 for (i, buffer) in array.pages.iter().enumerate() {
551 visitor.visit_buffer_handle(
552 &format!("page_{i}"),
553 &BufferHandle::new_host(buffer.clone()),
554 );
555 }
556 }
557
558 fn nbuffers(array: &PcoArray) -> usize {
559 array.chunk_metas.len() + array.pages.len()
560 }
561
562 fn visit_children(array: &PcoArray, visitor: &mut dyn ArrayChildVisitor) {
563 visitor.visit_validity(&array.unsliced_validity, array.unsliced_n_rows());
564 }
565
566 fn nchildren(array: &PcoArray) -> usize {
567 validity_nchildren(&array.unsliced_validity)
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use vortex_array::IntoArray;
574 use vortex_array::arrays::PrimitiveArray;
575 use vortex_array::assert_arrays_eq;
576 use vortex_array::validity::Validity;
577 use vortex_buffer::Buffer;
578
579 use crate::PcoArray;
580
581 #[test]
582 fn test_slice_nullable() {
583 let values = PrimitiveArray::new(
585 Buffer::copy_from(vec![10u32, 20, 30, 40, 50, 60]),
586 Validity::from_iter([false, true, true, true, true, false]),
587 );
588 let pco = PcoArray::from_primitive(&values, 0, 128).unwrap();
589 assert_arrays_eq!(
590 pco,
591 PrimitiveArray::from_option_iter([
592 None,
593 Some(20u32),
594 Some(30),
595 Some(40),
596 Some(50),
597 None
598 ])
599 );
600
601 let sliced = pco.slice(1..5).unwrap();
603 let expected =
604 PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
605 .into_array();
606 assert_arrays_eq!(sliced, expected);
607 }
608}