1use std::cmp;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::sync::Arc;
8
9use pco::ChunkConfig;
10use pco::PagingSpec;
11use pco::data_types::Number;
12use pco::data_types::NumberType;
13use pco::errors::PcoError;
14use pco::match_number_enum;
15use pco::wrapped::ChunkDecompressor;
16use pco::wrapped::FileCompressor;
17use pco::wrapped::FileDecompressor;
18use prost::Message;
19use vortex_array::ArrayEq;
20use vortex_array::ArrayHash;
21use vortex_array::ArrayRef;
22use vortex_array::DynArray;
23use vortex_array::ExecutionCtx;
24use vortex_array::ExecutionResult;
25use vortex_array::IntoArray;
26use vortex_array::LEGACY_SESSION;
27use vortex_array::Precision;
28use vortex_array::ProstMetadata;
29use vortex_array::ToCanonical;
30use vortex_array::VortexSessionExecute;
31use vortex_array::arrays::Primitive;
32use vortex_array::arrays::PrimitiveArray;
33use vortex_array::buffer::BufferHandle;
34use vortex_array::dtype::DType;
35use vortex_array::dtype::PType;
36use vortex_array::dtype::half;
37use vortex_array::scalar::Scalar;
38use vortex_array::serde::ArrayChildren;
39use vortex_array::stats::ArrayStats;
40use vortex_array::stats::StatsSetRef;
41use vortex_array::validity::Validity;
42use vortex_array::vtable;
43use vortex_array::vtable::Array;
44use vortex_array::vtable::ArrayId;
45use vortex_array::vtable::OperationsVTable;
46use vortex_array::vtable::VTable;
47use vortex_array::vtable::ValidityHelper;
48use vortex_array::vtable::ValiditySliceHelper;
49use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
50use vortex_array::vtable::validity_nchildren;
51use vortex_array::vtable::validity_to_child;
52use vortex_buffer::BufferMut;
53use vortex_buffer::ByteBuffer;
54use vortex_buffer::ByteBufferMut;
55use vortex_error::VortexError;
56use vortex_error::VortexExpect as _;
57use vortex_error::VortexResult;
58use vortex_error::vortex_bail;
59use vortex_error::vortex_ensure;
60use vortex_error::vortex_err;
61use vortex_error::vortex_panic;
62use vortex_session::VortexSession;
63
64use crate::PcoChunkInfo;
65use crate::PcoMetadata;
66use crate::PcoPageInfo;
67
68const VALUES_PER_CHUNK: usize = pco::DEFAULT_MAX_PAGE_N;
87
88vtable!(Pco);
89
90impl VTable for Pco {
91 type Array = PcoArray;
92
93 type Metadata = ProstMetadata<PcoMetadata>;
94 type OperationsVTable = Self;
95 type ValidityVTable = ValidityVTableFromValiditySliceHelper;
96
97 fn vtable(_array: &Self::Array) -> &Self {
98 &Pco
99 }
100
101 fn id(&self) -> ArrayId {
102 Self::ID
103 }
104
105 fn len(array: &PcoArray) -> usize {
106 array.slice_stop - array.slice_start
107 }
108
109 fn dtype(array: &PcoArray) -> &DType {
110 &array.dtype
111 }
112
113 fn stats(array: &PcoArray) -> StatsSetRef<'_> {
114 array.stats_set.to_ref(array.as_ref())
115 }
116
117 fn array_hash<H: std::hash::Hasher>(array: &PcoArray, state: &mut H, precision: Precision) {
118 array.dtype.hash(state);
119 array.unsliced_validity.array_hash(state, precision);
120 array.unsliced_n_rows.hash(state);
121 array.slice_start.hash(state);
122 array.slice_stop.hash(state);
123 for chunk_meta in &array.chunk_metas {
125 chunk_meta.array_hash(state, precision);
126 }
127 for page in &array.pages {
128 page.array_hash(state, precision);
129 }
130 }
131
132 fn array_eq(array: &PcoArray, other: &PcoArray, precision: Precision) -> bool {
133 if array.dtype != other.dtype
134 || !array
135 .unsliced_validity
136 .array_eq(&other.unsliced_validity, precision)
137 || array.unsliced_n_rows != other.unsliced_n_rows
138 || array.slice_start != other.slice_start
139 || array.slice_stop != other.slice_stop
140 || array.chunk_metas.len() != other.chunk_metas.len()
141 || array.pages.len() != other.pages.len()
142 {
143 return false;
144 }
145 for (a, b) in array.chunk_metas.iter().zip(&other.chunk_metas) {
146 if !a.array_eq(b, precision) {
147 return false;
148 }
149 }
150 for (a, b) in array.pages.iter().zip(&other.pages) {
151 if !a.array_eq(b, precision) {
152 return false;
153 }
154 }
155 true
156 }
157
158 fn nbuffers(array: &PcoArray) -> usize {
159 array.chunk_metas.len() + array.pages.len()
160 }
161
162 fn buffer(array: &PcoArray, idx: usize) -> BufferHandle {
163 if idx < array.chunk_metas.len() {
164 BufferHandle::new_host(array.chunk_metas[idx].clone())
165 } else {
166 let page_idx = idx - array.chunk_metas.len();
167 BufferHandle::new_host(array.pages[page_idx].clone())
168 }
169 }
170
171 fn buffer_name(array: &PcoArray, idx: usize) -> Option<String> {
172 if idx < array.chunk_metas.len() {
173 Some(format!("chunk_meta_{idx}"))
174 } else {
175 Some(format!("page_{}", idx - array.chunk_metas.len()))
176 }
177 }
178
179 fn nchildren(array: &PcoArray) -> usize {
180 validity_nchildren(&array.unsliced_validity)
181 }
182
183 fn child(array: &PcoArray, idx: usize) -> ArrayRef {
184 validity_to_child(&array.unsliced_validity, array.unsliced_n_rows)
185 .unwrap_or_else(|| vortex_panic!("PcoArray child index {idx} out of bounds"))
186 }
187
188 fn child_name(_array: &PcoArray, idx: usize) -> String {
189 match idx {
190 0 => "validity".to_string(),
191 _ => vortex_panic!("PcoArray child_name index {idx} out of bounds"),
192 }
193 }
194
195 fn metadata(array: &PcoArray) -> VortexResult<Self::Metadata> {
196 Ok(ProstMetadata(array.metadata.clone()))
197 }
198
199 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
200 Ok(Some(metadata.0.encode_to_vec()))
201 }
202
203 fn deserialize(
204 bytes: &[u8],
205 _dtype: &DType,
206 _len: usize,
207 _buffers: &[BufferHandle],
208 _session: &VortexSession,
209 ) -> VortexResult<Self::Metadata> {
210 Ok(ProstMetadata(PcoMetadata::decode(bytes)?))
211 }
212
213 fn build(
214 dtype: &DType,
215 len: usize,
216 metadata: &Self::Metadata,
217 buffers: &[BufferHandle],
218 children: &dyn ArrayChildren,
219 ) -> VortexResult<PcoArray> {
220 let validity = if children.is_empty() {
221 Validity::from(dtype.nullability())
222 } else if children.len() == 1 {
223 let validity = children.get(0, &Validity::DTYPE, len)?;
224 Validity::Array(validity)
225 } else {
226 vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
227 };
228
229 vortex_ensure!(buffers.len() >= metadata.0.chunks.len());
230 let chunk_metas = buffers[..metadata.0.chunks.len()]
231 .iter()
232 .map(|b| b.clone().try_to_host_sync())
233 .collect::<VortexResult<Vec<_>>>()?;
234 let pages = buffers[metadata.0.chunks.len()..]
235 .iter()
236 .map(|b| b.clone().try_to_host_sync())
237 .collect::<VortexResult<Vec<_>>>()?;
238
239 let expected_n_pages = metadata
240 .0
241 .chunks
242 .iter()
243 .map(|info| info.pages.len())
244 .sum::<usize>();
245 vortex_ensure!(pages.len() == expected_n_pages);
246
247 Ok(PcoArray::new(
248 chunk_metas,
249 pages,
250 dtype.clone(),
251 metadata.0.clone(),
252 len,
253 validity,
254 ))
255 }
256
257 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
258 vortex_ensure!(
259 children.len() <= 1,
260 "PcoArray expects 0 or 1 children, got {}",
261 children.len()
262 );
263
264 if children.is_empty() {
265 array.unsliced_validity = Validity::from(array.dtype.nullability());
266 } else {
267 array.unsliced_validity =
268 Validity::Array(children.into_iter().next().vortex_expect("validity child"));
269 }
270
271 Ok(())
272 }
273
274 fn execute(array: Arc<Array<Self>>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
275 Ok(ExecutionResult::done(array.decompress(ctx)?.into_array()))
276 }
277
278 fn reduce_parent(
279 array: &Array<Self>,
280 parent: &ArrayRef,
281 child_idx: usize,
282 ) -> VortexResult<Option<ArrayRef>> {
283 crate::rules::RULES.evaluate(array, parent, child_idx)
284 }
285}
286
287pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
288 let ptype = dtype.as_ptype();
289 match ptype {
290 PType::F16 => NumberType::F16,
291 PType::F32 => NumberType::F32,
292 PType::F64 => NumberType::F64,
293 PType::I16 => NumberType::I16,
294 PType::I32 => NumberType::I32,
295 PType::I64 => NumberType::I64,
296 PType::U16 => NumberType::U16,
297 PType::U32 => NumberType::U32,
298 PType::U64 => NumberType::U64,
299 _ => unreachable!("PType not supported by Pco: {:?}", ptype),
300 }
301}
302
303fn collect_valid(parray: &PrimitiveArray) -> VortexResult<PrimitiveArray> {
304 let mask = parray.validity_mask()?;
305 Ok(parray.clone().into_array().filter(mask)?.to_primitive())
306}
307
308pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
309 use pco::errors::ErrorKind::*;
310 match err.kind {
311 Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
312 InvalidArgument => vortex_err!(InvalidArgument: "{}", err.message),
313 other => vortex_err!("Pco {:?} error: {}", other, err.message),
314 }
315}
316
317#[derive(Clone, Debug)]
318pub struct Pco;
319
320impl Pco {
321 pub const ID: ArrayId = ArrayId::new_ref("vortex.pco");
322}
323
324#[derive(Clone, Debug)]
325pub struct PcoArray {
326 pub(crate) chunk_metas: Vec<ByteBuffer>,
327 pub(crate) pages: Vec<ByteBuffer>,
328 pub(crate) metadata: PcoMetadata,
329 dtype: DType,
330 pub(crate) unsliced_validity: Validity,
331 unsliced_n_rows: usize,
332 stats_set: ArrayStats,
333 slice_start: usize,
334 slice_stop: usize,
335}
336
337impl PcoArray {
338 pub fn new(
339 chunk_metas: Vec<ByteBuffer>,
340 pages: Vec<ByteBuffer>,
341 dtype: DType,
342 metadata: PcoMetadata,
343 len: usize,
344 validity: Validity,
345 ) -> Self {
346 Self {
347 chunk_metas,
348 pages,
349 metadata,
350 dtype,
351 unsliced_validity: validity,
352 unsliced_n_rows: len,
353 stats_set: Default::default(),
354 slice_start: 0,
355 slice_stop: len,
356 }
357 }
358
359 pub fn from_primitive(
360 parray: &PrimitiveArray,
361 level: usize,
362 values_per_page: usize,
363 ) -> VortexResult<Self> {
364 Self::from_primitive_with_values_per_chunk(parray, level, VALUES_PER_CHUNK, values_per_page)
365 }
366
367 pub(crate) fn from_primitive_with_values_per_chunk(
368 parray: &PrimitiveArray,
369 level: usize,
370 values_per_chunk: usize,
371 values_per_page: usize,
372 ) -> VortexResult<Self> {
373 let number_type = number_type_from_dtype(parray.dtype());
374 let values_per_page = if values_per_page == 0 {
375 values_per_chunk
376 } else {
377 values_per_page
378 };
379
380 let chunk_config = ChunkConfig::default()
382 .with_compression_level(level)
383 .with_paging_spec(PagingSpec::EqualPagesUpTo(values_per_page));
384
385 let values = collect_valid(parray)?;
386 let n_values = values.len();
387
388 let fc = FileCompressor::default();
389 let mut header = vec![];
390 fc.write_header(&mut header).map_err(vortex_err_from_pco)?;
391
392 let mut chunk_meta_buffers = vec![]; let mut chunk_infos = vec![]; let mut page_buffers = vec![];
395 for chunk_start in (0..n_values).step_by(values_per_chunk) {
396 let chunk_end = cmp::min(n_values, chunk_start + values_per_chunk);
397 let mut cc = match_number_enum!(
398 number_type,
399 NumberType<T> => {
400 let values = values.to_buffer::<T>();
401 let chunk = &values.as_slice()[chunk_start..chunk_end];
402 fc
403 .chunk_compressor(chunk, &chunk_config)
404 .map_err(vortex_err_from_pco)?
405 }
406 );
407
408 let mut chunk_meta_buffer = ByteBufferMut::with_capacity(cc.meta_size_hint());
409 cc.write_meta(&mut chunk_meta_buffer)
410 .map_err(vortex_err_from_pco)?;
411 chunk_meta_buffers.push(chunk_meta_buffer.freeze());
412
413 let mut page_infos = vec![];
414 for (page_idx, page_n_values) in cc.n_per_page().into_iter().enumerate() {
415 let mut page = ByteBufferMut::with_capacity(cc.page_size_hint(page_idx));
416 cc.write_page(page_idx, &mut page)
417 .map_err(vortex_err_from_pco)?;
418 page_buffers.push(page.freeze());
419 page_infos.push(PcoPageInfo {
420 n_values: u32::try_from(page_n_values)?,
421 });
422 }
423 chunk_infos.push(PcoChunkInfo { pages: page_infos })
424 }
425
426 let metadata = PcoMetadata {
427 header,
428 chunks: chunk_infos,
429 };
430 Ok(PcoArray::new(
431 chunk_meta_buffers,
432 page_buffers,
433 parray.dtype().clone(),
434 metadata,
435 parray.len(),
436 parray.validity().clone(),
437 ))
438 }
439
440 pub fn from_array(array: ArrayRef, level: usize, nums_per_page: usize) -> VortexResult<Self> {
441 if let Some(parray) = array.as_opt::<Primitive>() {
442 Self::from_primitive(parray, level, nums_per_page)
443 } else {
444 Err(vortex_err!("Pco can only encode primitive arrays"))
445 }
446 }
447
448 pub fn decompress(&self, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
449 let number_type = number_type_from_dtype(&self.dtype);
452 let values_byte_buffer = match_number_enum!(
453 number_type,
454 NumberType<T> => {
455 self.decompress_values_typed::<T>(ctx)?
456 }
457 );
458
459 Ok(PrimitiveArray::from_values_byte_buffer(
460 values_byte_buffer,
461 self.dtype.as_ptype(),
462 self.unsliced_validity
463 .slice(self.slice_start..self.slice_stop)?,
464 self.slice_stop - self.slice_start,
465 ))
466 }
467
468 fn decompress_values_typed<T: Number>(
469 &self,
470 ctx: &mut ExecutionCtx,
471 ) -> VortexResult<ByteBuffer> {
472 let slice_value_indices = self
474 .unsliced_validity
475 .execute_mask(self.unsliced_n_rows, ctx)?
476 .valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
477 let slice_value_start = slice_value_indices[0];
478 let slice_value_stop = slice_value_indices[1];
479 let slice_n_values = slice_value_stop - slice_value_start;
480
481 let (fd, _) =
484 FileDecompressor::new(self.metadata.header.as_slice()).map_err(vortex_err_from_pco)?;
485 let mut decompressed_values = BufferMut::<T>::with_capacity(slice_n_values);
486 let mut page_idx = 0;
487 let mut page_value_start = 0;
488 let mut n_skipped_values = 0;
489 for (chunk_info, chunk_meta) in self.metadata.chunks.iter().zip(&self.chunk_metas) {
490 let mut chunk_decompressor: Option<ChunkDecompressor<T>> = None;
492 for page_info in &chunk_info.pages {
493 let page_n_values = page_info.n_values as usize;
494 let page_value_stop = page_value_start + page_n_values;
495
496 if page_value_start >= slice_value_stop {
497 break;
498 }
499
500 if page_value_stop > slice_value_start {
501 let old_len = decompressed_values.len();
503 let new_len = old_len + page_n_values;
504 decompressed_values.reserve(page_n_values);
505 unsafe {
506 decompressed_values.set_len(new_len);
507 }
508 let page: &[u8] = self.pages[page_idx].as_ref();
509
510 let mut cd = match chunk_decompressor.take() {
511 Some(d) => d,
512 None => {
513 let (new_cd, _) = fd
514 .chunk_decompressor(chunk_meta.as_ref())
515 .map_err(vortex_err_from_pco)?;
516 new_cd
517 }
518 };
519
520 let mut pd = cd
521 .page_decompressor(page, page_n_values)
522 .map_err(vortex_err_from_pco)?;
523 pd.read(&mut decompressed_values[old_len..new_len])
524 .map_err(vortex_err_from_pco)?;
525
526 chunk_decompressor = Some(cd);
527 } else {
528 n_skipped_values += page_n_values;
529 }
530
531 page_value_start = page_value_stop;
532 page_idx += 1;
533 }
534 }
535
536 let value_offset = slice_value_start - n_skipped_values;
538 Ok(decompressed_values
539 .freeze()
540 .slice(value_offset..value_offset + slice_n_values)
541 .into_byte_buffer())
542 }
543
544 pub(crate) fn _slice(&self, start: usize, stop: usize) -> Self {
545 PcoArray {
546 slice_start: self.slice_start + start,
547 slice_stop: self.slice_start + stop,
548 stats_set: Default::default(),
549 ..self.clone()
550 }
551 }
552
553 pub(crate) fn dtype(&self) -> &DType {
554 &self.dtype
555 }
556
557 pub(crate) fn slice_start(&self) -> usize {
558 self.slice_start
559 }
560
561 pub(crate) fn slice_stop(&self) -> usize {
562 self.slice_stop
563 }
564
565 pub(crate) fn unsliced_n_rows(&self) -> usize {
566 self.unsliced_n_rows
567 }
568}
569
570impl ValiditySliceHelper for PcoArray {
571 fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
572 (&self.unsliced_validity, self.slice_start, self.slice_stop)
573 }
574}
575
576impl OperationsVTable<Pco> for Pco {
577 fn scalar_at(array: &PcoArray, index: usize, _ctx: &mut ExecutionCtx) -> VortexResult<Scalar> {
578 let mut ctx = LEGACY_SESSION.create_execution_ctx();
579 array
580 ._slice(index, index + 1)
581 .decompress(&mut ctx)?
582 .scalar_at(0)
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use vortex_array::IntoArray;
589 use vortex_array::arrays::PrimitiveArray;
590 use vortex_array::assert_arrays_eq;
591 use vortex_array::validity::Validity;
592 use vortex_buffer::buffer;
593
594 use crate::PcoArray;
595
596 #[test]
597 fn test_slice_nullable() {
598 let values = PrimitiveArray::new(
600 buffer![10u32, 20, 30, 40, 50, 60],
601 Validity::from_iter([false, true, true, true, true, false]),
602 );
603 let pco = PcoArray::from_primitive(&values, 0, 128).unwrap();
604 assert_arrays_eq!(
605 pco,
606 PrimitiveArray::from_option_iter([
607 None,
608 Some(20u32),
609 Some(30),
610 Some(40),
611 Some(50),
612 None
613 ])
614 );
615
616 let sliced = pco.slice(1..5).unwrap();
618 let expected =
619 PrimitiveArray::from_option_iter([Some(20u32), Some(30), Some(40), Some(50)])
620 .into_array();
621 assert_arrays_eq!(sliced, expected);
622 }
623}