dora_node_api/node/
arrow_utils.rs1use arrow::array::{ArrayData, BufferSpec};
4use dora_message::metadata::{ArrowTypeInfo, BufferOffset};
5use eyre::Context;
6
7pub fn required_data_size(array: &ArrayData) -> usize {
10 let mut next_offset = 0;
11 required_data_size_inner(array, &mut next_offset);
12 next_offset
13}
14fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) {
15 let layout = arrow::array::layout(array.data_type());
16 for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
17 if let BufferSpec::FixedWidth { alignment, .. } = *spec {
19 *next_offset = (*next_offset).div_ceil(alignment) * alignment;
20 }
21 *next_offset += buffer.len();
22 }
23 for child in array.child_data() {
24 required_data_size_inner(child, next_offset);
25 }
26}
27
28pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo {
35 let mut next_offset = 0;
36 copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array)
37}
38
39fn copy_array_into_sample_inner(
40 target_buffer: &mut [u8],
41 next_offset: &mut usize,
42 arrow_array: &ArrayData,
43) -> ArrowTypeInfo {
44 let mut buffer_offsets = Vec::new();
45 let layout = arrow::array::layout(arrow_array.data_type());
46 for (buffer, spec) in arrow_array.buffers().iter().zip(&layout.buffers) {
47 let len = buffer.len();
48 assert!(
49 target_buffer[*next_offset..].len() >= len,
50 "target buffer too small (total_len: {}, offset: {}, required_len: {len})",
51 target_buffer.len(),
52 *next_offset,
53 );
54 if let BufferSpec::FixedWidth { alignment, .. } = *spec {
56 *next_offset = (*next_offset).div_ceil(alignment) * alignment;
57 }
58
59 target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice());
60 buffer_offsets.push(BufferOffset {
61 offset: *next_offset,
62 len,
63 });
64 *next_offset += len;
65 }
66
67 let mut child_data = Vec::new();
68 for child in arrow_array.child_data() {
69 let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child);
70 child_data.push(child_type_info);
71 }
72
73 ArrowTypeInfo {
74 data_type: arrow_array.data_type().clone(),
75 len: arrow_array.len(),
76 null_count: arrow_array.null_count(),
77 validity: arrow_array.nulls().map(|b| b.validity().to_owned()),
78 offset: arrow_array.offset(),
79 buffer_offsets,
80 child_data,
81 }
82}
83
84pub fn buffer_into_arrow_array(
88 raw_buffer: &arrow::buffer::Buffer,
89 type_info: &ArrowTypeInfo,
90) -> eyre::Result<arrow::array::ArrayData> {
91 if raw_buffer.is_empty() {
92 return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
93 }
94
95 let mut buffers = Vec::new();
96 for BufferOffset { offset, len } in &type_info.buffer_offsets {
97 if offset.saturating_add(*len) > raw_buffer.len() {
98 eyre::bail!(
99 "Buffer length out of bounds: offset {} + len {} > buffer len {}",
100 offset,
101 len,
102 raw_buffer.len()
103 );
104 }
105 buffers.push(raw_buffer.slice_with_length(*offset, *len));
106 }
107
108 let mut child_data = Vec::new();
109 for child_type_info in &type_info.child_data {
110 child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
111 }
112
113 arrow::array::ArrayData::try_new(
114 type_info.data_type.clone(),
115 type_info.len,
116 type_info
117 .validity
118 .clone()
119 .map(arrow::buffer::Buffer::from_vec),
120 type_info.offset,
121 buffers,
122 child_data,
123 )
124 .context("Error creating Arrow array")
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use arrow::datatypes::DataType;
131 use dora_message::metadata::BufferOffset;
132
133 #[test]
134 fn test_malicious_arrow_metadata_panics() {
135 let type_info = ArrowTypeInfo {
136 data_type: DataType::UInt8,
137 len: 1,
138 null_count: 0,
139 validity: None,
140 offset: 0,
141 buffer_offsets: vec![BufferOffset {
142 offset: 1000,
143 len: 100,
144 }], child_data: vec![],
146 };
147
148 let raw_buffer = arrow::buffer::Buffer::from_slice_ref(&[1, 2, 3]);
149
150 let result = buffer_into_arrow_array(&raw_buffer, &type_info);
151 assert!(result.is_err());
152 assert_eq!(
153 result.unwrap_err().to_string(),
154 "Buffer length out of bounds: offset 1000 + len 100 > buffer len 12"
155 );
156 }
157}