dora_node_api/node/
arrow_utils.rs

1use arrow::array::{ArrayData, BufferSpec};
2use dora_message::metadata::{ArrowTypeInfo, BufferOffset};
3
4pub fn required_data_size(array: &ArrayData) -> usize {
5    let mut next_offset = 0;
6    required_data_size_inner(array, &mut next_offset);
7    next_offset
8}
9fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) {
10    let layout = arrow::array::layout(array.data_type());
11    for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
12        // consider alignment padding
13        if let BufferSpec::FixedWidth { alignment, .. } = spec {
14            *next_offset = (*next_offset + alignment - 1) / alignment * alignment;
15        }
16        *next_offset += buffer.len();
17    }
18    for child in array.child_data() {
19        required_data_size_inner(child, next_offset);
20    }
21}
22
23pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo {
24    let mut next_offset = 0;
25    copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array)
26}
27
28fn copy_array_into_sample_inner(
29    target_buffer: &mut [u8],
30    next_offset: &mut usize,
31    arrow_array: &ArrayData,
32) -> ArrowTypeInfo {
33    let mut buffer_offsets = Vec::new();
34    let layout = arrow::array::layout(arrow_array.data_type());
35    for (buffer, spec) in arrow_array.buffers().iter().zip(&layout.buffers) {
36        let len = buffer.len();
37        assert!(
38            target_buffer[*next_offset..].len() >= len,
39            "target buffer too small (total_len: {}, offset: {}, required_len: {len})",
40            target_buffer.len(),
41            *next_offset,
42        );
43        // add alignment padding
44        if let BufferSpec::FixedWidth { alignment, .. } = spec {
45            *next_offset = (*next_offset + alignment - 1) / alignment * alignment;
46        }
47
48        target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice());
49        buffer_offsets.push(BufferOffset {
50            offset: *next_offset,
51            len,
52        });
53        *next_offset += len;
54    }
55
56    let mut child_data = Vec::new();
57    for child in arrow_array.child_data() {
58        let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child);
59        child_data.push(child_type_info);
60    }
61
62    ArrowTypeInfo {
63        data_type: arrow_array.data_type().clone(),
64        len: arrow_array.len(),
65        null_count: arrow_array.null_count(),
66        validity: arrow_array.nulls().map(|b| b.validity().to_owned()),
67        offset: arrow_array.offset(),
68        buffer_offsets,
69        child_data,
70    }
71}