arrow_message/traits/
flattening.rs

1use arrow::{
2    array::{ArrayData, BufferSpec},
3    buffer::{Buffer, MutableBuffer},
4    datatypes::DataType,
5    error::Result,
6};
7
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct BufferOffset {
12    pub len: usize,
13    pub offset: usize,
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ArrayDataLayout {
18    pub data_type: DataType,
19    pub len: usize,
20    pub null_bit_buffer: Option<Vec<u8>>,
21    pub offset: usize,
22    pub buffers: Vec<BufferOffset>,
23    pub child_data: Vec<ArrayDataLayout>,
24}
25
26pub trait ArrayDataFlattening {
27    fn layout_with_values(&self) -> (ArrayDataLayout, Buffer);
28    fn flattened(&self) -> Result<ArrayData>;
29
30    fn layout(&self) -> ArrayDataLayout;
31    fn required_size(&self) -> usize;
32    fn fill(&self, target: &mut [u8]);
33
34    fn from_layout_and_values(layout: ArrayDataLayout, values: Buffer) -> Result<ArrayData>;
35}
36
37impl ArrayDataFlattening for ArrayData {
38    fn layout(&self) -> ArrayDataLayout {
39        fn layout_inner(array: &ArrayData, next_offset: &mut usize) -> ArrayDataLayout {
40            let mut buffers = Vec::new();
41            let mut child_data = Vec::new();
42
43            let layout = arrow::array::layout(array.data_type());
44
45            for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
46                if let BufferSpec::FixedWidth { alignment, .. } = spec {
47                    *next_offset = (*next_offset).div_ceil(*alignment) * alignment;
48                }
49
50                buffers.push(BufferOffset {
51                    len: buffer.len(),
52                    offset: *next_offset,
53                });
54
55                *next_offset += buffer.len();
56            }
57
58            for child in array.child_data() {
59                child_data.push(layout_inner(child, next_offset));
60            }
61
62            ArrayDataLayout {
63                data_type: array.data_type().clone(),
64                len: array.len(),
65                null_bit_buffer: array.nulls().map(|b| b.validity().to_owned()),
66                offset: array.offset(),
67                buffers,
68                child_data,
69            }
70        }
71
72        let mut next_offset = 0;
73
74        layout_inner(self, &mut next_offset)
75    }
76
77    fn required_size(&self) -> usize {
78        fn required_size_inner(array: &ArrayData, next_offset: &mut usize) {
79            let layout = arrow::array::layout(array.data_type());
80
81            for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
82                if let BufferSpec::FixedWidth { alignment, .. } = spec {
83                    *next_offset = (*next_offset).div_ceil(*alignment) * alignment;
84                }
85
86                *next_offset += buffer.len();
87            }
88
89            for child in array.child_data() {
90                required_size_inner(child, next_offset);
91            }
92        }
93
94        let mut next_offset = 0;
95        required_size_inner(self, &mut next_offset);
96
97        next_offset
98    }
99
100    fn fill(&self, target: &mut [u8]) {
101        fn fill_inner(array: &ArrayData, next_offset: &mut usize, target: &mut [u8]) {
102            let layout = arrow::array::layout(array.data_type());
103
104            for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
105                if let BufferSpec::FixedWidth { alignment, .. } = spec {
106                    *next_offset = (*next_offset).div_ceil(*alignment) * alignment;
107                }
108
109                target[*next_offset..*next_offset + buffer.len()]
110                    .copy_from_slice(buffer.as_slice());
111                *next_offset += buffer.len();
112            }
113
114            for child in array.child_data() {
115                fill_inner(child, next_offset, target);
116            }
117        }
118
119        let mut next_offset = 0;
120
121        fill_inner(self, &mut next_offset, target);
122    }
123
124    fn layout_with_values(&self) -> (ArrayDataLayout, Buffer) {
125        fn layout_inner(
126            array: &ArrayData,
127            next_offset: &mut usize,
128            data: &mut MutableBuffer,
129        ) -> ArrayDataLayout {
130            let mut buffers = Vec::new();
131            let mut child_data = Vec::new();
132
133            let layout = arrow::array::layout(array.data_type());
134
135            for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
136                if let BufferSpec::FixedWidth { alignment, .. } = spec {
137                    *next_offset = (*next_offset).div_ceil(*alignment) * alignment;
138                }
139
140                if (buffer.len() + *next_offset) - data.len() > 0 {
141                    let space_needed = (buffer.len() + *next_offset) - data.len();
142                    data.extend_zeros(space_needed);
143                }
144
145                data[*next_offset..*next_offset + buffer.len()].copy_from_slice(buffer.as_slice());
146
147                buffers.push(BufferOffset {
148                    len: buffer.len(),
149                    offset: *next_offset,
150                });
151
152                *next_offset += buffer.len();
153            }
154
155            for child in array.child_data() {
156                child_data.push(layout_inner(child, next_offset, data));
157            }
158
159            ArrayDataLayout {
160                data_type: array.data_type().clone(),
161                len: array.len(),
162                null_bit_buffer: array.nulls().map(|b| b.validity().to_owned()),
163                offset: array.offset(),
164                buffers,
165                child_data,
166            }
167        }
168
169        let mut data = MutableBuffer::new(64);
170
171        let mut next_offset = 0;
172        let layout = layout_inner(self, &mut next_offset, &mut data);
173
174        (layout, data.into())
175    }
176
177    fn from_layout_and_values(layout: ArrayDataLayout, values: Buffer) -> Result<ArrayData> {
178        fn inner(buffer: &Buffer, layout: ArrayDataLayout) -> Result<ArrayData> {
179            if buffer.is_empty() {
180                return Ok(ArrayData::new_empty(&layout.data_type));
181            }
182
183            let mut buffers = Vec::new();
184            let mut child_data = Vec::new();
185
186            for BufferOffset { offset, len } in layout.buffers {
187                buffers.push(buffer.slice_with_length(offset, len));
188            }
189
190            for child_data_data in layout.child_data {
191                child_data.push(inner(buffer, child_data_data)?)
192            }
193
194            ArrayData::try_new(
195                layout.data_type,
196                layout.len,
197                layout.null_bit_buffer.map(arrow::buffer::Buffer::from_vec),
198                layout.offset,
199                buffers,
200                child_data,
201            )
202        }
203
204        inner(&values, layout)
205    }
206
207    fn flattened(&self) -> Result<ArrayData> {
208        let (layout, values) = self.layout_with_values();
209
210        Self::from_layout_and_values(layout, values)
211    }
212}