Skip to main content

dora_node_api/node/
arrow_utils.rs

1//! Utility functions for converting Arrow arrays to/from raw data.
2//!
3use arrow::array::{ArrayData, BufferSpec};
4use dora_message::metadata::{ArrowTypeInfo, BufferOffset};
5use eyre::Context;
6
7/// Calculates the data size in bytes required for storing a continuous copy of the given Arrow
8/// array.
9pub fn required_data_size(array: &ArrayData) -> usize {
10    let mut next_offset = 0;
11    required_data_size_inner(array, &mut next_offset);
12    next_offset
13}
14fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) {
15    let layout = arrow::array::layout(array.data_type());
16    for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
17        // consider alignment padding
18        if let BufferSpec::FixedWidth { alignment, .. } = *spec {
19            *next_offset = (*next_offset).div_ceil(alignment) * alignment;
20        }
21        *next_offset += buffer.len();
22    }
23    for child in array.child_data() {
24        required_data_size_inner(child, next_offset);
25    }
26}
27
28/// Copy the given Arrow array into the provided buffer.
29///
30/// If the Arrow array consists of multiple buffers, they are placed continuously in the target
31/// buffer (there might be some padding for alignment)
32///
33/// Panics if the buffer is not large enough.
34pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo {
35    let mut next_offset = 0;
36    copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array)
37}
38
39fn copy_array_into_sample_inner(
40    target_buffer: &mut [u8],
41    next_offset: &mut usize,
42    arrow_array: &ArrayData,
43) -> ArrowTypeInfo {
44    let mut buffer_offsets = Vec::new();
45    let layout = arrow::array::layout(arrow_array.data_type());
46    for (buffer, spec) in arrow_array.buffers().iter().zip(&layout.buffers) {
47        let len = buffer.len();
48        assert!(
49            target_buffer[*next_offset..].len() >= len,
50            "target buffer too small (total_len: {}, offset: {}, required_len: {len})",
51            target_buffer.len(),
52            *next_offset,
53        );
54        // add alignment padding
55        if let BufferSpec::FixedWidth { alignment, .. } = *spec {
56            *next_offset = (*next_offset).div_ceil(alignment) * alignment;
57        }
58
59        target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice());
60        buffer_offsets.push(BufferOffset {
61            offset: *next_offset,
62            len,
63        });
64        *next_offset += len;
65    }
66
67    let mut child_data = Vec::new();
68    for child in arrow_array.child_data() {
69        let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child);
70        child_data.push(child_type_info);
71    }
72
73    ArrowTypeInfo {
74        data_type: arrow_array.data_type().clone(),
75        len: arrow_array.len(),
76        null_count: arrow_array.null_count(),
77        validity: arrow_array.nulls().map(|b| b.validity().to_owned()),
78        offset: arrow_array.offset(),
79        buffer_offsets,
80        child_data,
81    }
82}
83
84/// Tries to convert the given raw Arrow buffer into an Arrow array.
85///
86/// The `type_info` is required for decoding the `raw_buffer` correctly.
87pub fn buffer_into_arrow_array(
88    raw_buffer: &arrow::buffer::Buffer,
89    type_info: &ArrowTypeInfo,
90) -> eyre::Result<arrow::array::ArrayData> {
91    if raw_buffer.is_empty() {
92        return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
93    }
94
95    let mut buffers = Vec::new();
96    for BufferOffset { offset, len } in &type_info.buffer_offsets {
97        if offset.saturating_add(*len) > raw_buffer.len() {
98            eyre::bail!(
99                "Buffer length out of bounds: offset {} + len {} > buffer len {}",
100                offset,
101                len,
102                raw_buffer.len()
103            );
104        }
105        buffers.push(raw_buffer.slice_with_length(*offset, *len));
106    }
107
108    let mut child_data = Vec::new();
109    for child_type_info in &type_info.child_data {
110        child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
111    }
112
113    arrow::array::ArrayData::try_new(
114        type_info.data_type.clone(),
115        type_info.len,
116        type_info
117            .validity
118            .clone()
119            .map(arrow::buffer::Buffer::from_vec),
120        type_info.offset,
121        buffers,
122        child_data,
123    )
124    .context("Error creating Arrow array")
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use arrow::datatypes::DataType;
131    use dora_message::metadata::BufferOffset;
132
133    #[test]
134    fn test_malicious_arrow_metadata_panics() {
135        let type_info = ArrowTypeInfo {
136            data_type: DataType::UInt8,
137            len: 1,
138            null_count: 0,
139            validity: None,
140            offset: 0,
141            buffer_offsets: vec![BufferOffset {
142                offset: 1000,
143                len: 100,
144            }], // MALICIOUS: Out of bounds
145            child_data: vec![],
146        };
147
148        let raw_buffer = arrow::buffer::Buffer::from_slice_ref(&[1, 2, 3]);
149
150        let result = buffer_into_arrow_array(&raw_buffer, &type_info);
151        assert!(result.is_err());
152        assert_eq!(
153            result.unwrap_err().to_string(),
154            "Buffer length out of bounds: offset 1000 + len 100 > buffer len 12"
155        );
156    }
157}