1use vortex_array::serde::ArrayChildren;
5use vortex_array::validity::Validity;
6use vortex_array::vtable::{EncodeVTable, SerdeVTable, VisitorVTable};
7use vortex_array::{ArrayBufferVisitor, ArrayChildVisitor, ProstMetadata};
8use vortex_buffer::ByteBuffer;
9use vortex_dtype::DType;
10use vortex_error::{VortexResult, vortex_bail, vortex_ensure};
11
12use crate::{PcoArray, PcoEncoding, PcoVTable};
13
14#[derive(Clone, prost::Message)]
15pub struct PcoPageInfo {
16 #[prost(uint32, tag = "1")]
19 pub n_values: u32,
20}
21
22#[derive(Clone, prost::Message)]
25pub struct PcoChunkInfo {
26 #[prost(message, repeated, tag = "1")]
27 pub pages: Vec<PcoPageInfo>,
28}
29
30#[derive(Clone, prost::Message)]
31pub struct PcoMetadata {
32 #[prost(bytes, tag = "1")]
35 pub header: Vec<u8>,
36 #[prost(message, repeated, tag = "2")]
37 pub chunks: Vec<PcoChunkInfo>,
38}
39
40impl SerdeVTable<PcoVTable> for PcoVTable {
41 type Metadata = ProstMetadata<PcoMetadata>;
42
43 fn metadata(array: &PcoArray) -> VortexResult<Option<Self::Metadata>> {
44 Ok(Some(ProstMetadata(array.metadata.clone())))
45 }
46
47 fn build(
48 _encoding: &PcoEncoding,
49 dtype: &DType,
50 len: usize,
51 metadata: &PcoMetadata,
52 buffers: &[ByteBuffer],
53 children: &dyn ArrayChildren,
54 ) -> VortexResult<PcoArray> {
55 let validity = if children.is_empty() {
56 Validity::from(dtype.nullability())
57 } else if children.len() == 1 {
58 let validity = children.get(0, &Validity::DTYPE, len)?;
59 Validity::Array(validity)
60 } else {
61 vortex_bail!("PcoArray expected 0 or 1 child, got {}", children.len());
62 };
63
64 vortex_ensure!(buffers.len() >= metadata.chunks.len());
65 let chunk_metas = buffers[..metadata.chunks.len()].to_vec();
66 let pages = buffers[metadata.chunks.len()..].to_vec();
67
68 let expected_n_pages = metadata
69 .chunks
70 .iter()
71 .map(|info| info.pages.len())
72 .sum::<usize>();
73 vortex_ensure!(pages.len() == expected_n_pages);
74
75 Ok(PcoArray::new(
76 chunk_metas,
77 pages,
78 dtype.clone(),
79 metadata.clone(),
80 len,
81 validity,
82 ))
83 }
84}
85
86impl EncodeVTable<PcoVTable> for PcoVTable {
87 fn encode(
88 _encoding: &<PcoVTable as vortex_array::vtable::VTable>::Encoding,
89 canonical: &vortex_array::Canonical,
90 _like: Option<&PcoArray>,
91 ) -> VortexResult<Option<PcoArray>> {
92 let parray = canonical.clone().into_primitive();
93
94 Ok(Some(PcoArray::from_primitive(&parray, 3, 0)?))
95 }
96}
97
98impl VisitorVTable<PcoVTable> for PcoVTable {
99 fn visit_buffers(array: &PcoArray, visitor: &mut dyn ArrayBufferVisitor) {
100 for buffer in &array.chunk_metas {
101 visitor.visit_buffer(buffer);
102 }
103 for buffer in &array.pages {
104 visitor.visit_buffer(buffer);
105 }
106 }
107
108 fn visit_children(array: &PcoArray, visitor: &mut dyn ArrayChildVisitor) {
109 visitor.visit_validity(&array.unsliced_validity, array.unsliced_n_rows());
110 }
111}