arrow_message/traits/
flattening.rs1use arrow::{
2 array::{ArrayData, BufferSpec},
3 buffer::{Buffer, MutableBuffer},
4 datatypes::DataType,
5 error::Result,
6};
7
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct BufferOffset {
12 pub len: usize,
13 pub offset: usize,
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ArrayDataLayout {
18 pub data_type: DataType,
19 pub len: usize,
20 pub null_bit_buffer: Option<Vec<u8>>,
21 pub offset: usize,
22 pub buffers: Vec<BufferOffset>,
23 pub child_data: Vec<ArrayDataLayout>,
24}
25
26pub trait ArrayDataFlattening {
27 fn layout_with_values(&self) -> (ArrayDataLayout, Buffer);
28 fn flattened(&self) -> Result<ArrayData>;
29
30 fn layout(&self) -> ArrayDataLayout;
31 fn required_size(&self) -> usize;
32 fn fill(&self, target: &mut [u8]);
33
34 fn from_layout_and_values(layout: ArrayDataLayout, values: Buffer) -> Result<ArrayData>;
35}
36
37impl ArrayDataFlattening for ArrayData {
38 fn layout(&self) -> ArrayDataLayout {
39 fn layout_inner(array: &ArrayData, next_offset: &mut usize) -> ArrayDataLayout {
40 let mut buffers = Vec::new();
41 let mut child_data = Vec::new();
42
43 let layout = arrow::array::layout(array.data_type());
44
45 for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
46 if let BufferSpec::FixedWidth { alignment, .. } = spec {
47 *next_offset = (*next_offset).div_ceil(*alignment) * alignment;
48 }
49
50 buffers.push(BufferOffset {
51 len: buffer.len(),
52 offset: *next_offset,
53 });
54
55 *next_offset += buffer.len();
56 }
57
58 for child in array.child_data() {
59 child_data.push(layout_inner(child, next_offset));
60 }
61
62 ArrayDataLayout {
63 data_type: array.data_type().clone(),
64 len: array.len(),
65 null_bit_buffer: array.nulls().map(|b| b.validity().to_owned()),
66 offset: array.offset(),
67 buffers,
68 child_data,
69 }
70 }
71
72 let mut next_offset = 0;
73
74 layout_inner(self, &mut next_offset)
75 }
76
77 fn required_size(&self) -> usize {
78 fn required_size_inner(array: &ArrayData, next_offset: &mut usize) {
79 let layout = arrow::array::layout(array.data_type());
80
81 for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
82 if let BufferSpec::FixedWidth { alignment, .. } = spec {
83 *next_offset = (*next_offset).div_ceil(*alignment) * alignment;
84 }
85
86 *next_offset += buffer.len();
87 }
88
89 for child in array.child_data() {
90 required_size_inner(child, next_offset);
91 }
92 }
93
94 let mut next_offset = 0;
95 required_size_inner(self, &mut next_offset);
96
97 next_offset
98 }
99
100 fn fill(&self, target: &mut [u8]) {
101 fn fill_inner(array: &ArrayData, next_offset: &mut usize, target: &mut [u8]) {
102 let layout = arrow::array::layout(array.data_type());
103
104 for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
105 if let BufferSpec::FixedWidth { alignment, .. } = spec {
106 *next_offset = (*next_offset).div_ceil(*alignment) * alignment;
107 }
108
109 target[*next_offset..*next_offset + buffer.len()]
110 .copy_from_slice(buffer.as_slice());
111 *next_offset += buffer.len();
112 }
113
114 for child in array.child_data() {
115 fill_inner(child, next_offset, target);
116 }
117 }
118
119 let mut next_offset = 0;
120
121 fill_inner(self, &mut next_offset, target);
122 }
123
124 fn layout_with_values(&self) -> (ArrayDataLayout, Buffer) {
125 fn layout_inner(
126 array: &ArrayData,
127 next_offset: &mut usize,
128 data: &mut MutableBuffer,
129 ) -> ArrayDataLayout {
130 let mut buffers = Vec::new();
131 let mut child_data = Vec::new();
132
133 let layout = arrow::array::layout(array.data_type());
134
135 for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
136 if let BufferSpec::FixedWidth { alignment, .. } = spec {
137 *next_offset = (*next_offset).div_ceil(*alignment) * alignment;
138 }
139
140 if (buffer.len() + *next_offset) - data.len() > 0 {
141 let space_needed = (buffer.len() + *next_offset) - data.len();
142 data.extend_zeros(space_needed);
143 }
144
145 data[*next_offset..*next_offset + buffer.len()].copy_from_slice(buffer.as_slice());
146
147 buffers.push(BufferOffset {
148 len: buffer.len(),
149 offset: *next_offset,
150 });
151
152 *next_offset += buffer.len();
153 }
154
155 for child in array.child_data() {
156 child_data.push(layout_inner(child, next_offset, data));
157 }
158
159 ArrayDataLayout {
160 data_type: array.data_type().clone(),
161 len: array.len(),
162 null_bit_buffer: array.nulls().map(|b| b.validity().to_owned()),
163 offset: array.offset(),
164 buffers,
165 child_data,
166 }
167 }
168
169 let mut data = MutableBuffer::new(64);
170
171 let mut next_offset = 0;
172 let layout = layout_inner(self, &mut next_offset, &mut data);
173
174 (layout, data.into())
175 }
176
177 fn from_layout_and_values(layout: ArrayDataLayout, values: Buffer) -> Result<ArrayData> {
178 fn inner(buffer: &Buffer, layout: ArrayDataLayout) -> Result<ArrayData> {
179 if buffer.is_empty() {
180 return Ok(ArrayData::new_empty(&layout.data_type));
181 }
182
183 let mut buffers = Vec::new();
184 let mut child_data = Vec::new();
185
186 for BufferOffset { offset, len } in layout.buffers {
187 buffers.push(buffer.slice_with_length(offset, len));
188 }
189
190 for child_data_data in layout.child_data {
191 child_data.push(inner(buffer, child_data_data)?)
192 }
193
194 ArrayData::try_new(
195 layout.data_type,
196 layout.len,
197 layout.null_bit_buffer.map(arrow::buffer::Buffer::from_vec),
198 layout.offset,
199 buffers,
200 child_data,
201 )
202 }
203
204 inner(&values, layout)
205 }
206
207 fn flattened(&self) -> Result<ArrayData> {
208 let (layout, values) = self.layout_with_values();
209
210 Self::from_layout_and_values(layout, values)
211 }
212}