1use std::cmp;
5use std::fmt::Debug;
6use std::hash::Hash;
7
8use pco::ChunkConfig;
9use pco::PagingSpec;
10use pco::data_types::Number;
11use pco::data_types::NumberType;
12use pco::errors::PcoError;
13use pco::match_number_enum;
14use pco::wrapped::ChunkDecompressor;
15use pco::wrapped::FileCompressor;
16use pco::wrapped::FileDecompressor;
17use prost::Message;
18use vortex_array::ArrayEq;
19use vortex_array::ArrayHash;
20use vortex_array::ArrayRef;
21use vortex_array::DynArray;
22use vortex_array::ExecutionCtx;
23use vortex_array::ExecutionStep;
24use vortex_array::IntoArray;
25use vortex_array::Precision;
26use vortex_array::ProstMetadata;
27use vortex_array::ToCanonical;
28use vortex_array::arrays::PrimitiveArray;
29use vortex_array::arrays::PrimitiveVTable;
30use vortex_array::buffer::BufferHandle;
31use vortex_array::dtype::DType;
32use vortex_array::dtype::PType;
33use vortex_array::dtype::half;
34use vortex_array::scalar::Scalar;
35use vortex_array::serde::ArrayChildren;
36use vortex_array::stats::ArrayStats;
37use vortex_array::stats::StatsSetRef;
38use vortex_array::validity::Validity;
39use vortex_array::vtable;
40use vortex_array::vtable::ArrayId;
41use vortex_array::vtable::OperationsVTable;
42use vortex_array::vtable::VTable;
43use vortex_array::vtable::ValidityHelper;
44use vortex_array::vtable::ValiditySliceHelper;
45use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
46use vortex_array::vtable::validity_nchildren;
47use vortex_array::vtable::validity_to_child;
48use vortex_buffer::BufferMut;
49use vortex_buffer::ByteBuffer;
50use vortex_buffer::ByteBufferMut;
51use vortex_error::VortexError;
52use vortex_error::VortexExpect as _;
53use vortex_error::VortexResult;
54use vortex_error::vortex_bail;
55use vortex_error::vortex_ensure;
56use vortex_error::vortex_err;
57use vortex_error::vortex_panic;
58use vortex_session::VortexSession;
59
60use crate::PcoChunkInfo;
61use crate::PcoMetadata;
62use crate::PcoPageInfo;
63
64const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
83
84vtable!(Pco);
85
86impl VTable for PcoVTable {
87 type Array = PcoArray;
88
89 type Metadata = ProstMetadata<PcoMetadata>;
90 type OperationsVTable = Self;
91 type ValidityVTable = ValidityVTableFromValiditySliceHelper;
92
93 fn id(_array: &Self::Array) -> ArrayId {
94 Self::ID
95 }
96
97 fn len(array: &PcoArray) -> usize {
98 array.slice_stop - array.slice_start
99 }
100
101 fn dtype(array: &PcoArray) -> &DType {
102 &array.dtype
103 }
104
105 fn stats(array: &PcoArray) -> StatsSetRef<'_> {
106 array.stats_set.to_ref(array.as_ref())
107 }
108
109 fn array_hash<H: std::hash::Hasher>(array: &PcoArray, state: &mut H, precision: Precision) {
110 array.dtype.hash(state);
111 array.unsliced_validity.array_hash(state, precision);
112 array.unsliced_n_rows.hash(state);
113 array.slice_start.hash(state);
114 array.slice_stop.hash(state);
115 for chunk_meta in &array.chunk_metas {
117 chunk_meta.array_hash(state, precision);
118 }
119 for page in &array.pages {
120 page.array_hash(state, precision);
121 }
122 }
123
124 fn array_eq(array: &PcoArray, other: &PcoArray, precision: Precision) -> bool {
125 if array.dtype != other.dtype
126 || !array
127 .unsliced_validity
128 .array_eq(&other.unsliced_validity, precision)
129 || array.unsliced_n_rows != other.unsliced_n_rows
130 || array.slice_start != other.slice_start
131 || array.slice_stop != other.slice_stop
132 || array.chunk_metas.len() != other.chunk_metas.len()
133 || array.pages.len() != other.pages.len()
134 {
135 return false;
136 }
137 for (a, b) in array.chunk_metas.iter().zip(&other.chunk_metas) {
138 if !a.array_eq(b, precision) {
139 return false;
140 }
141 }
142 for (a, b) in array.pages.iter().zip(&other.pages) {
143 if !a.array_eq(b, precision) {
144 return false;
145 }
146 }
147 true
148 }
149
150 fn nbuffers(array: &PcoArray) -> usize {
151 array.chunk_metas.len() + array.pages.len()
152 }
153
154 fn buffer(array: &PcoArray, idx: usize) -> BufferHandle {
155 if idx < array.chunk_metas.len() {
156 BufferHandle::new_host(array.chunk_metas[idx].clone())
157 } else {
158 let page_idx = idx - array.chunk_metas.len();
159 BufferHandle::new_host(array.pages[page_idx].clone())
160 }
161 }
162
163 fn buffer_name(array: &PcoArray, idx: usize) -> Option<String> {
164 if idx < array.chunk_metas.len() {
165 Some(format!("chunk_meta_{idx}"))
166 } else {
167 Some(format!("page_{}", idx - array.chunk_metas.len()))
168 }
169 }
170
171 fn nchildren(array: &PcoArray) -> usize {
172 validity_nchildren(&array.unsliced_validity)
173 }
174
175 fn child(array: &PcoArray, idx: usize) -> ArrayRef {
176 validity_to_child(&array.unsliced_validity, array.unsliced_n_rows)
177 .unwrap_or_else(|| vortex_panic!("PcoArray child index {idx} out of bounds"))
178 }
179
180 fn child_name(_array: &PcoArray, idx: usize) -> String {
181 match idx {
182 0 => "validity".to_string(),
183 _ => vortex_panic!("PcoArray child_name index {idx} out of bounds"),
184 }
185 }
186
187 fn metadata(array: &PcoArray) -> VortexResult<Self::Metadata> {
188 Ok(ProstMetadata(array.metadata.clone()))
189 }
190
191 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
192 Ok(Some(metadata.0.encode_to_vec()))
193 }
194
195 fn deserialize(
196 bytes: &[u8],
197 _dtype: &DType,
198 _len: usize,
199 _buffers: &[BufferHandle],
200 _session: &VortexSession,
201 ) -> VortexResult<Self::Metadata> {
202 Ok(ProstMetadata(PcoMetadata::decode(bytes)?))
203 }
204
205 fn build(
206 dtype: &DType,
207 len: usize,
208 metadata: &Self::Metadata,
209 buffers: &[BufferHandle],
210 children: &dyn ArrayChildren,
211 ) -> VortexResult<PcoArray> {
212 let validity = if children.is_empty() {
213 Validity::from(dtype.nullability())
214 } else if children.len() == 1 {
215 let validity = children.get(0, &Validity::DTYPE, len)?;
216 Validity::Array(validity)
217 } else {
218 vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
219 };
220
221 vortex_ensure!(buffers.len() >= metadata.0.chunks.len());
222 let chunk_metas = buffers[..metadata.0.chunks.len()]
223 .iter()
224 .map(|b| b.clone().try_to_host_sync())
225 .collect::<VortexResult<Vec<_>>>()?;
226 let pages = buffers[metadata.0.chunks.len()..]
227 .iter()
228 .map(|b| b.clone().try_to_host_sync())
229 .collect::<VortexResult<Vec<_>>>()?;
230
231 let expected_n_pages = metadata
232 .0
233 .chunks
234 .iter()
235 .map(|info| info.pages.len())
236 .sum::<usize>();
237 vortex_ensure!(pages.len() == expected_n_pages);
238
239 Ok(PcoArray::new(
240 chunk_metas,
241 pages,
242 dtype.clone(),
243 metadata.0.clone(),
244 len,
245 validity,
246 ))
247 }
248
249 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
250 vortex_ensure!(
251 children.len() <= 1,
252 "PcoArray expects 0 or 1 children, got {}",
253 children.len()
254 );
255
256 if children.is_empty() {
257 array.unsliced_validity = Validity::from(array.dtype.nullability());
258 } else {
259 array.unsliced_validity =
260 Validity::Array(children.into_iter().next().vortex_expect("validity child"));
261 }
262
263 Ok(())
264 }
265
266 fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
267 Ok(ExecutionStep::Done(array.decompress()?.into_array()))
268 }
269
270 fn reduce_parent(
271 array: &Self::Array,
272 parent: &ArrayRef,
273 child_idx: usize,
274 ) -> VortexResult<Option<ArrayRef>> {
275 crate::rules::RULES.evaluate(array, parent, child_idx)
276 }
277}
278
279pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
280 let ptype = dtype.as_ptype();
281 match ptype {
282 PType::F16 => NumberType::F16,
283 PType::F32 => NumberType::F32,
284 PType::F64 => NumberType::F64,
285 PType::I16 => NumberType::I16,
286 PType::I32 => NumberType::I32,
287 PType::I64 => NumberType::I64,
288 PType::U16 => NumberType::U16,
289 PType::U32 => NumberType::U32,
290 PType::U64 => NumberType::U64,
291 _ => unreachable!("PType not supported by Pco: {:?}", ptype),
292 }
293}
294
295fn collect_valid(parray: &PrimitiveArray) -> VortexResult<PrimitiveArray> {
296 let mask = parray.validity_mask()?;
297 Ok(parray.clone().into_array().filter(mask)?.to_primitive())
298}
299
300pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
301 use pco::errors::ErrorKind::*;
302 match err.kind {
303 Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
304 InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
305 other => vortex_err!("Pco {:?} error: {}", other, err.message),
306 }
307}
308
309#[derive(Debug)]
310pub struct PcoVTable;
311
312impl PcoVTable {
313 pub const ID: ArrayId = ArrayId::new_ref("vortex.pco");
314}
315
316#[derive(Clone, Debug)]
317pub struct PcoArray {
318 pub(crate) chunk_metas: Vec<ByteBuffer>,
319 pub(crate) pages: Vec<ByteBuffer>,
320 pub(crate) metadata: PcoMetadata,
321 dtype: DType,
322 pub(crate) unsliced_validity: Validity,
323 unsliced_n_rows: usize,
324 stats_set: ArrayStats,
325 slice_start: usize,
326 slice_stop: usize,
327}
328
329impl PcoArray {
330 pub fn new(
331 chunk_metas: Vec<ByteBuffer>,
332 pages: Vec<ByteBuffer>,
333 dtype: DType,
334 metadata: PcoMetadata,
335 len: usize,
336 validity: Validity,
337 ) -> Self {
338 Self {
339 chunk_metas,
340 pages,
341 metadata,
342 dtype,
343 unsliced_validity: validity,
344 unsliced_n_rows: len,
345 stats_set: Default::default(),
346 slice_start: 0,
347 slice_stop: len,
348 }
349 }
350
351 pub fn from_primitive(
352 parray: &PrimitiveArray,
353 level: usize,
354 values_per_page: usize,
355 ) -> VortexResult<Self> {
356 Self::from_primitive_with_values_per_chunk(parray, level, VALUES_PER_CHUNK, values_per_page)
357 }
358
359 pub(crate) fn from_primitive_with_values_per_chunk(
360 parray: &PrimitiveArray,
361 level: usize,
362 values_per_chunk: usize,
363 values_per_page: usize,
364 ) -> VortexResult<Self> {
365 let number_type = number_type_from_dtype(parray.dtype());
366 let values_per_page = if values_per_page == 0 {
367 values_per_chunk
368 } else {
369 values_per_page
370 };
371
372 let chunk_config = ChunkConfig::default()
374 .with_compression_level(level)
375 .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
376
377 let values = collect_valid(parray)?;
378 let n_values = values.len();
379
380 let fc = FileCompressor::default();
381 let mut header = vec![];
382 fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
383
384 let mut chunk_meta_buffers = vec![]; let mut chunk_infos = vec![]; let mut page_buffers = vec![];
387 for chunk_start in (0..n_values).step_by(values_per_chunk) {
388 let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
389 let mut cc = match_number_enum!(
390 number_type,
391 NumberType<T> => {
392 let values = values.to_buffer::<T>();
393 let chunk = &values.as_slice()[chunk_start..chunk_end];
394 fc
395 .chunk_compressor(chunk, &chunk_config)
396 .map_err(vortex_err_from_pco)?
397 }
398 );
399
400 let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
401 cc.write_meta(&mut chunk_meta_buffer)
402 .map_err(vortex_err_from_pco)?;
403 chunk_meta_buffers.push(chunk_meta_buffer.freeze());
404
405 let mut page_infos = vec![];
406 for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
407 let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
408 cc.write_page(page_idx, &mut page)
409 .map_err(vortex_err_from_pco)?;
410 page_buffers.push(page.freeze());
411 page_infos.push(PcoPageInfo {
412 n_values: u32::try_from(page_n_values)?,
413 });
414 }
415 chunk_infos.push(PcoChunkInfo { pages: page_infos })
416 }
417
418 let metadata = PcoMetadata {
419 header,
420 chunks: chunk_infos,
421 };
422 Ok(PcoArray::new(
423 chunk_meta_buffers,
424 page_buffers,
425 parray.dtype().clone(),
426 metadata,
427 parray.len(),
428 parray.validity().clone(),
429 ))
430 }
431
432 pub fn from_array(array: ArrayRef, level: usize, nums_per_page: usize) -> VortexResult<Self> {
433 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
434 Self::from_primitive(parray, level, nums_per_page)
435 } else {
436 Err(vortex_err!("Pco can only encode primitive arrays"))
437 }
438 }
439
440 pub fn decompress(&self) -> VortexResult<PrimitiveArray> {
441 let number_type = number_type_from_dtype(&self.dtype);
444 let values_byte_buffer = match_number_enum!(
445 number_type,
446 NumberType<T> => {
447 self.decompress_values_typed::<T>()?
448 }
449 );
450
451 Ok(PrimitiveArray::from_values_byte_buffer(
452 values_byte_buffer,
453 self.dtype.as_ptype(),
454 self.unsliced_validity
455 .slice(self.slice_start..self.slice_stop)?,
456 self.slice_stop - self.slice_start,
457 ))
458 }
459
460 fn decompress_values_typed<T: Number>(&self) -> VortexResult<ByteBuffer> {
461 let slice_value_indices = self
463 .unsliced_validity
464 .to_mask(self.unsliced_n_rows)
465 .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
466 let slice_value_start = slice_value_indices[0];
467 let slice_value_stop = slice_value_indices[1];
468 let slice_n_values = slice_value_stop - slice_value_start;
469
470 let (fd, _) =
473 FileDecompressor::new(self.metadata.header.as_slice()).map_err(vortex_err_from_pco)?;
474 let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
475 let mut page_idx = 0;
476 let mut page_value_start = 0;
477 let mut n_skipped_values = 0;
478 for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
479 let mut chunk_decompressor: Option<ChunkDecompressor<T>> = None;
481 for page_info in &chunk_info.pages {
482 let page_n_values = page_info.n_values as usize;
483 let page_value_stop = page_value_start + page_n_values;
484
485 if page_value_start >= slice_value_stop {
486 break;
487 }
488
489 if page_value_stop > slice_value_start {
490 let old_len = decompressed_values.len();
492 let new_len = old_len + page_n_values;
493 decompressed_values.reserve(page_n_values);
494 unsafe {
495 decompressed_values.set_len(new_len);
496 }
497 let page: &[u8] = self.pages[page_idx].as_ref();
498
499 let mut cd = match chunk_decompressor.take() {
500 Some(d) => d,
501 None => {
502 let (new_cd, _) = fd
503 .chunk_decompressor(chunk_meta.as_ref())
504 .map_err(vortex_err_from_pco)?;
505 new_cd
506 }
507 };
508
509 let mut pd = cd
510 .page_decompressor(page, page_n_values)
511 .map_err(vortex_err_from_pco)?;
512 pd.read(&mut decompressed_values[old_len..new_len])
513 .map_err(vortex_err_from_pco)?;
514
515 chunk_decompressor = Some(cd);
516 } else {
517 n_skipped_values += page_n_values;
518 }
519
520 page_value_start = page_value_stop;
521 page_idx += 1;
522 }
523 }
524
525 let value_offset = slice_value_start - n_skipped_values;
527 Ok(decompressed_values
528 .freeze()
529 .slice(value_offset..value_offset + slice_n_values)
530 .into_byte_buffer())
531 }
532
533 pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
534 PcoArray {
535 slice_start: self.slice_start + start,
536 slice_stop: self.slice_start + stop,
537 stats_set: Default::default(),
538 ..self.clone()
539 }
540 }
541
542 pub(crate) fn dtype(&self) -> &DType {
543 &self.dtype
544 }
545
546 pub(crate) fn slice_start(&self) -> usize {
547 self.slice_start
548 }
549
550 pub(crate) fn slice_stop(&self) -> usize {
551 self.slice_stop
552 }
553
554 pub(crate) fn unsliced_n_rows(&self) -> usize {
555 self.unsliced_n_rows
556 }
557}
558
559impl ValiditySliceHelper for PcoArray {
560 fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
561 (&self.unsliced_validity, self.slice_start, self.slice_stop)
562 }
563}
564
565impl OperationsVTable<PcoVTable> for PcoVTable {
566 fn scalar_at(array: &PcoArray, index: usize) -> VortexResult<Scalar> {
567 array._slice(index, index + 1).decompress()?.scalar_at(0)
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use vortex_array::IntoArray;
574 use vortex_array::arrays::PrimitiveArray;
575 use vortex_array::assert_arrays_eq;
576 use vortex_array::validity::Validity;
577 use vortex_buffer::buffer;
578
579 use crate::PcoArray;
580
581 #[test]
582 fn test_slice_nullable() {
583 let values = PrimitiveArray::new(
585 buffer![10u32, 20, 30, 40, 50, 60],
586 Validity::from_iter([false, true, true, true, true, false]),
587 );
588 let pco = PcoArray::from_primitive(&values, 0, 128).unwrap();
589 assert_arrays_eq!(
590 pco,
591 PrimitiveArray::from_option_iter([
592 None,
593 Some(20u32),
594 Some(30),
595 Some(40),
596 Some(50),
597 None
598 ])
599 );
600
601 let sliced = pco.slice(1..5).unwrap();
603 let expected =
604 PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
605 .into_array();
606 assert_arrays_eq!(sliced, expected);
607 }
608}