1use arrow_array::{ArrayRef, make_array};
5use arrow_buffer::Buffer;
6use arrow_data::{ArrayDataBuilder, transform::MutableArrayData};
7use arrow_schema::{ArrowError, DataType};
8
9use crate::DataTypeExt;
10
11type Result<T> = std::result::Result<T, ArrowError>;
12
13pub const INLINE_VALUE_MAX_BYTES: usize = 32;
14
15pub fn extract_scalar_value(array: &ArrayRef, idx: usize) -> Result<ArrayRef> {
16 if idx >= array.len() {
17 return Err(ArrowError::InvalidArgumentError(
18 "Scalar index out of bounds".to_string(),
19 ));
20 }
21
22 let data = array.to_data();
23 let mut mutable = MutableArrayData::new(vec![&data], true, 1);
24 mutable.extend(0, idx, idx + 1);
25 Ok(make_array(mutable.freeze()))
26}
27
28fn read_u32(buf: &[u8], offset: &mut usize) -> Result<u32> {
29 if *offset + 4 > buf.len() {
30 return Err(ArrowError::InvalidArgumentError(
31 "Invalid scalar value buffer: unexpected EOF".to_string(),
32 ));
33 }
34 let bytes = [
35 buf[*offset],
36 buf[*offset + 1],
37 buf[*offset + 2],
38 buf[*offset + 3],
39 ];
40 *offset += 4;
41 Ok(u32::from_le_bytes(bytes))
42}
43
44fn read_bytes<'a>(buf: &'a [u8], offset: &mut usize, len: usize) -> Result<&'a [u8]> {
45 if *offset + len > buf.len() {
46 return Err(ArrowError::InvalidArgumentError(
47 "Invalid scalar value buffer: unexpected EOF".to_string(),
48 ));
49 }
50 let slice = &buf[*offset..*offset + len];
51 *offset += len;
52 Ok(slice)
53}
54
55fn write_u32(out: &mut Vec<u8>, v: u32) {
56 out.extend_from_slice(&v.to_le_bytes());
57}
58
59fn write_bytes(out: &mut Vec<u8>, bytes: &[u8]) {
60 out.extend_from_slice(bytes);
61}
62
63pub fn encode_scalar_value_buffer(scalar: &ArrayRef) -> Result<Vec<u8>> {
64 if scalar.len() != 1 || scalar.null_count() != 0 {
65 return Err(ArrowError::InvalidArgumentError(
66 "Scalar value buffer must be a single non-null value".to_string(),
67 ));
68 }
69 let data = scalar.to_data();
70 if data.offset() != 0 {
71 return Err(ArrowError::InvalidArgumentError(
72 "Scalar value buffer must have offset=0".to_string(),
73 ));
74 }
75 if !data.child_data().is_empty() {
76 return Err(ArrowError::InvalidArgumentError(
77 "Scalar value buffer does not support nested types".to_string(),
78 ));
79 }
80
81 let mut out = Vec::with_capacity(128);
88 let buffers = data.buffers();
89 write_u32(&mut out, buffers.len() as u32);
90 for b in buffers {
91 write_u32(&mut out, b.len() as u32);
92 }
93 for b in buffers {
94 write_bytes(&mut out, b.as_slice());
95 }
96 Ok(out)
97}
98
99pub fn decode_scalar_from_value_buffer(
100 data_type: &DataType,
101 value_buffer: &[u8],
102) -> Result<ArrayRef> {
103 if matches!(
104 data_type,
105 DataType::Struct(_) | DataType::FixedSizeList(_, _)
106 ) {
107 return Err(ArrowError::InvalidArgumentError(format!(
108 "Scalar value buffer does not support nested data type {:?}",
109 data_type
110 )));
111 }
112
113 let mut offset = 0;
114 let num_buffers = read_u32(value_buffer, &mut offset)? as usize;
115 let buffer_lens = (0..num_buffers)
116 .map(|_| read_u32(value_buffer, &mut offset).map(|l| l as usize))
117 .collect::<Result<Vec<_>>>()?;
118
119 let mut buffers = Vec::with_capacity(num_buffers);
120 for len in buffer_lens {
121 let bytes = read_bytes(value_buffer, &mut offset, len)?;
122 buffers.push(Buffer::from_vec(bytes.to_vec()));
123 }
124
125 if offset != value_buffer.len() {
126 return Err(ArrowError::InvalidArgumentError(
127 "Invalid scalar value buffer: trailing bytes".to_string(),
128 ));
129 }
130
131 let mut builder = ArrayDataBuilder::new(data_type.clone())
132 .len(1)
133 .null_count(0);
134 for b in buffers {
135 builder = builder.add_buffer(b);
136 }
137 Ok(make_array(builder.build()?))
138}
139
140pub fn decode_scalar_from_inline_value(
141 data_type: &DataType,
142 inline_value: &[u8],
143) -> Result<ArrayRef> {
144 if matches!(data_type, DataType::Boolean) {
151 debug_assert_eq!(
152 inline_value.len(),
153 1,
154 "Invalid boolean inline scalar length (expected 1 byte, got {})",
155 inline_value.len()
156 );
157 } else if let Some(byte_width) = data_type.byte_width_opt() {
158 debug_assert_eq!(
159 inline_value.len(),
160 byte_width,
161 "Inline constant length mismatch for {:?}: expected {} bytes but got {}",
162 data_type,
163 byte_width,
164 inline_value.len()
165 );
166 }
167
168 let data = ArrayDataBuilder::new(data_type.clone())
169 .len(1)
170 .null_count(0)
171 .add_buffer(Buffer::from_vec(inline_value.to_vec()))
172 .build()?;
173 Ok(make_array(data))
174}
175
176pub fn try_inline_value(scalar: &ArrayRef) -> Option<Vec<u8>> {
177 if scalar.null_count() != 0 || scalar.len() != 1 {
178 return None;
179 }
180 let data = scalar.to_data();
181 if !data.child_data().is_empty() {
182 return None;
183 }
184 if data.buffers().len() != 1 {
185 return None;
186 }
187 let bytes = data.buffers()[0].as_slice();
188 if bytes.len() > INLINE_VALUE_MAX_BYTES {
189 return None;
190 }
191 Some(bytes.to_vec())
192}
193
194#[cfg(test)]
195mod tests {
196 use std::sync::Arc;
197
198 use arrow_array::{BooleanArray, FixedSizeBinaryArray, Int32Array, StringArray, cast::AsArray};
199
200 use super::*;
201
202 #[test]
203 fn test_extract_scalar_value() {
204 let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
205 let scalar = extract_scalar_value(&array, 2).unwrap();
206 assert_eq!(scalar.len(), 1);
207 assert_eq!(
208 scalar
209 .as_primitive::<arrow_array::types::Int32Type>()
210 .value(0),
211 3
212 );
213 }
214
215 #[test]
216 fn test_scalar_value_buffer_utf8_round_trip() {
217 let scalar: ArrayRef = Arc::new(StringArray::from(vec!["hello"]));
218 let buf = encode_scalar_value_buffer(&scalar).unwrap();
219 let decoded = decode_scalar_from_value_buffer(&DataType::Utf8, &buf).unwrap();
220 assert_eq!(decoded.len(), 1);
221 assert_eq!(decoded.null_count(), 0);
222 assert_eq!(decoded.as_string::<i32>().value(0), "hello");
223 }
224
225 #[test]
226 fn test_scalar_value_buffer_fixed_size_binary_round_trip() {
227 let val = vec![0xABu8; 33];
228 let scalar: ArrayRef = Arc::new(
229 FixedSizeBinaryArray::try_from_sparse_iter_with_size(
230 std::iter::once(Some(val.as_slice())),
231 33,
232 )
233 .unwrap(),
234 );
235 let buf = encode_scalar_value_buffer(&scalar).unwrap();
236 let decoded =
237 decode_scalar_from_value_buffer(&DataType::FixedSizeBinary(33), &buf).unwrap();
238 assert_eq!(decoded.len(), 1);
239 assert_eq!(decoded.as_fixed_size_binary().value(0), val.as_slice());
240 }
241
242 #[test]
243 fn test_inline_value_boolean_round_trip() {
244 let scalar: ArrayRef = Arc::new(BooleanArray::from_iter([Some(true)]));
245 let inline = try_inline_value(&scalar).unwrap();
246 let decoded = decode_scalar_from_inline_value(&DataType::Boolean, &inline).unwrap();
247 assert_eq!(decoded.len(), 1);
248 assert_eq!(decoded.null_count(), 0);
249 assert!(decoded.as_boolean().value(0));
250 }
251
252 #[test]
253 fn test_scalar_value_buffer_rejects_nested_type() {
254 let field = Arc::new(arrow_schema::Field::new("item", DataType::Int32, false));
255 let list: ArrayRef = Arc::new(arrow_array::FixedSizeListArray::new(
256 field,
257 2,
258 Arc::new(Int32Array::from(vec![1, 2])),
259 None,
260 ));
261 let scalar = list.slice(0, 1);
262 assert!(encode_scalar_value_buffer(&scalar).is_err());
263 }
264
265 #[test]
266 fn test_decode_scalar_from_value_buffer_rejects_nested_type() {
267 let buf = Vec::<u8>::new();
268 let res =
269 decode_scalar_from_value_buffer(&DataType::Struct(arrow_schema::Fields::empty()), &buf);
270 assert!(res.is_err());
271 }
272
273 #[test]
274 fn test_decode_scalar_from_value_buffer_trailing_bytes() {
275 let mut bytes = Vec::new();
277 bytes.extend_from_slice(&0u32.to_le_bytes());
278 bytes.push(1);
279 let res = decode_scalar_from_value_buffer(&DataType::Int32, &bytes);
280 assert!(res.is_err());
281 }
282}