1use std::sync::{Arc, OnceLock};
4
5use arrow::array::{
6 Array, ArrayRef, Float32Array, Int32Array, Int64Array, ListArray, StructArray, UInt8Array,
7};
8use arrow::buffer::OffsetBuffer;
9use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
10use arrow::record_batch::RecordBatch;
11
12#[allow(missing_docs)]
14pub struct CameraRgb<'a> {
15 pub pixels: &'a [u8],
16 pub width: i32,
17 pub height: i32,
18 pub fx: f32,
19 pub fy: f32,
20 pub cx: f32,
21 pub cy: f32,
22 pub timestamp_ns: i64,
23}
24
25#[derive(Debug, Clone, PartialEq)]
27#[allow(missing_docs)]
28pub struct CameraRgbOwned {
29 pub pixels: Vec<u8>,
30 pub width: i32,
31 pub height: i32,
32 pub fx: f32,
33 pub fy: f32,
34 pub cx: f32,
35 pub cy: f32,
36 pub timestamp_ns: i64,
37}
38
39pub fn schema() -> SchemaRef {
41 static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
42 SCHEMA
43 .get_or_init(|| {
44 Arc::new(Schema::new(vec![
45 Field::new(
46 "pixels",
47 DataType::List(Arc::new(Field::new("item", DataType::UInt8, false))),
48 false,
49 ),
50 Field::new("width", DataType::Int32, false),
51 Field::new("height", DataType::Int32, false),
52 Field::new("fx", DataType::Float32, false),
53 Field::new("fy", DataType::Float32, false),
54 Field::new("cx", DataType::Float32, false),
55 Field::new("cy", DataType::Float32, false),
56 Field::new("timestamp_ns", DataType::Int64, false),
57 ]))
58 })
59 .clone()
60}
61
62fn list_u8(values: &[u8]) -> ListArray {
63 let inner = UInt8Array::from_iter_values(values.iter().copied());
64 let offsets = OffsetBuffer::from_lengths([values.len()]);
65 ListArray::new(
66 Arc::new(Field::new("item", DataType::UInt8, false)),
67 offsets,
68 Arc::new(inner),
69 None,
70 )
71}
72
73pub fn to_record_batch(img: &CameraRgb) -> Result<RecordBatch, arrow::error::ArrowError> {
87 let columns: Vec<ArrayRef> = vec![
88 Arc::new(list_u8(img.pixels)),
89 Arc::new(Int32Array::from_iter_values(std::iter::once(img.width))),
90 Arc::new(Int32Array::from_iter_values(std::iter::once(img.height))),
91 Arc::new(Float32Array::from_iter_values(std::iter::once(img.fx))),
92 Arc::new(Float32Array::from_iter_values(std::iter::once(img.fy))),
93 Arc::new(Float32Array::from_iter_values(std::iter::once(img.cx))),
94 Arc::new(Float32Array::from_iter_values(std::iter::once(img.cy))),
95 Arc::new(Int64Array::from_iter_values(std::iter::once(
96 img.timestamp_ns,
97 ))),
98 ];
99 RecordBatch::try_new(schema(), columns)
100}
101
102#[allow(missing_docs)]
104pub struct CameraRgbBorrowed<'a> {
105 pub pixels: &'a [u8],
106 pub width: i32,
107 pub height: i32,
108 pub fx: f32,
109 pub fy: f32,
110 pub cx: f32,
111 pub cy: f32,
112 pub timestamp_ns: i64,
113}
114
115pub fn from_struct_array_borrowed(
117 array: &StructArray,
118) -> Result<CameraRgbBorrowed<'_>, arrow::error::ArrowError> {
119 if array.is_empty() {
120 return Err(arrow::error::ArrowError::InvalidArgumentError(
121 "camera_rgb struct array is empty".into(),
122 ));
123 }
124 let pixels_list = array
125 .column(0)
126 .as_any()
127 .downcast_ref::<ListArray>()
128 .ok_or_else(|| {
129 arrow::error::ArrowError::SchemaError("camera_rgb 'pixels' not ListArray".into())
130 })?;
131 let pixels = pixels_list
132 .values()
133 .as_any()
134 .downcast_ref::<UInt8Array>()
135 .ok_or_else(|| {
136 arrow::error::ArrowError::SchemaError("camera_rgb 'pixels' inner not UInt8".into())
137 })?
138 .values();
139 Ok(CameraRgbBorrowed {
140 pixels,
141 width: scalar_i32(array, 1, "width")?,
142 height: scalar_i32(array, 2, "height")?,
143 fx: scalar_f32(array, 3, "fx")?,
144 fy: scalar_f32(array, 4, "fy")?,
145 cx: scalar_f32(array, 5, "cx")?,
146 cy: scalar_f32(array, 6, "cy")?,
147 timestamp_ns: scalar_i64(array, 7, "timestamp_ns")?,
148 })
149}
150
151pub fn from_struct_array(array: &StructArray) -> Result<CameraRgbOwned, arrow::error::ArrowError> {
168 if array.is_empty() {
169 return Err(arrow::error::ArrowError::InvalidArgumentError(
170 "camera_rgb struct array is empty".into(),
171 ));
172 }
173 let pixels_list = array
174 .column(0)
175 .as_any()
176 .downcast_ref::<ListArray>()
177 .ok_or_else(|| {
178 arrow::error::ArrowError::SchemaError("camera_rgb 'pixels' not ListArray".into())
179 })?;
180 let pixels = pixels_list
181 .values()
182 .as_any()
183 .downcast_ref::<UInt8Array>()
184 .ok_or_else(|| {
185 arrow::error::ArrowError::SchemaError("camera_rgb 'pixels' inner not UInt8".into())
186 })?
187 .values()
188 .to_vec();
189 Ok(CameraRgbOwned {
190 pixels,
191 width: scalar_i32(array, 1, "width")?,
192 height: scalar_i32(array, 2, "height")?,
193 fx: scalar_f32(array, 3, "fx")?,
194 fy: scalar_f32(array, 4, "fy")?,
195 cx: scalar_f32(array, 5, "cx")?,
196 cy: scalar_f32(array, 6, "cy")?,
197 timestamp_ns: scalar_i64(array, 7, "timestamp_ns")?,
198 })
199}
200
201fn scalar_i32(
202 array: &StructArray,
203 idx: usize,
204 name: &str,
205) -> Result<i32, arrow::error::ArrowError> {
206 array
207 .column(idx)
208 .as_any()
209 .downcast_ref::<Int32Array>()
210 .ok_or_else(|| {
211 arrow::error::ArrowError::SchemaError(format!("camera_rgb '{name}' not Int32"))
212 })
213 .map(|a| a.value(0))
214}
215
216fn scalar_f32(
217 array: &StructArray,
218 idx: usize,
219 name: &str,
220) -> Result<f32, arrow::error::ArrowError> {
221 array
222 .column(idx)
223 .as_any()
224 .downcast_ref::<Float32Array>()
225 .ok_or_else(|| {
226 arrow::error::ArrowError::SchemaError(format!("camera_rgb '{name}' not Float32"))
227 })
228 .map(|a| a.value(0))
229}
230
231fn scalar_i64(
232 array: &StructArray,
233 idx: usize,
234 name: &str,
235) -> Result<i64, arrow::error::ArrowError> {
236 array
237 .column(idx)
238 .as_any()
239 .downcast_ref::<Int64Array>()
240 .ok_or_else(|| {
241 arrow::error::ArrowError::SchemaError(format!("camera_rgb '{name}' not Int64"))
242 })
243 .map(|a| a.value(0))
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use arrow::array::Array;
250
251 #[test]
252 fn round_trips_through_record_batch() {
253 let pixels = vec![0_u8; 12]; let img = CameraRgb {
255 pixels: &pixels,
256 width: 2,
257 height: 2,
258 fx: 100.0,
259 fy: 100.0,
260 cx: 1.0,
261 cy: 1.0,
262 timestamp_ns: 42,
263 };
264 let batch = to_record_batch(&img).expect("convert");
265 assert_eq!(batch.num_rows(), 1);
266 assert_eq!(batch.num_columns(), 8);
267
268 let pixels_col = batch
269 .column(0)
270 .as_any()
271 .downcast_ref::<ListArray>()
272 .expect("pixels is ListArray");
273 let inner = pixels_col
274 .values()
275 .as_any()
276 .downcast_ref::<UInt8Array>()
277 .expect("inner is UInt8Array");
278 assert_eq!(inner.len(), 12);
279
280 let ts = batch
281 .column(7)
282 .as_any()
283 .downcast_ref::<Int64Array>()
284 .expect("timestamp_ns is Int64");
285 assert_eq!(ts.value(0), 42);
286 }
287
288 #[test]
289 fn from_struct_array_round_trips() {
290 let pixels = vec![0_u8, 64, 128, 255, 1, 2, 3, 4, 5, 6, 7, 8];
291 let img = CameraRgb {
292 pixels: &pixels,
293 width: 2,
294 height: 2,
295 fx: 100.0,
296 fy: 110.0,
297 cx: 1.0,
298 cy: 2.0,
299 timestamp_ns: 42,
300 };
301 let batch = to_record_batch(&img).expect("to");
302 let array = StructArray::from(batch);
303 let owned = from_struct_array(&array).expect("from");
304 assert_eq!(owned.pixels, pixels);
305 assert_eq!(owned.width, 2);
306 assert_eq!(owned.fy, 110.0);
307 assert_eq!(owned.timestamp_ns, 42);
308 }
309}