1use std::sync::{Arc, OnceLock};
4
5use arrow::array::{Array, ArrayRef, Float32Array, Int32Array, Int64Array, ListArray, StructArray};
6use arrow::buffer::OffsetBuffer;
7use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
8use arrow::record_batch::RecordBatch;
9
10#[allow(missing_docs)]
12pub struct CameraDepth<'a> {
13 pub depths: &'a [f32],
14 pub width: i32,
15 pub height: i32,
16 pub fx: f32,
17 pub fy: f32,
18 pub cx: f32,
19 pub cy: f32,
20 pub timestamp_ns: i64,
21}
22
23#[derive(Debug, Clone, PartialEq)]
25#[allow(missing_docs)]
26pub struct CameraDepthOwned {
27 pub depths: Vec<f32>,
28 pub width: i32,
29 pub height: i32,
30 pub fx: f32,
31 pub fy: f32,
32 pub cx: f32,
33 pub cy: f32,
34 pub timestamp_ns: i64,
35}
36
37pub fn schema() -> SchemaRef {
39 static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
40 SCHEMA
41 .get_or_init(|| {
42 Arc::new(Schema::new(vec![
43 Field::new(
44 "depths",
45 DataType::List(Arc::new(Field::new("item", DataType::Float32, false))),
46 false,
47 ),
48 Field::new("width", DataType::Int32, false),
49 Field::new("height", DataType::Int32, false),
50 Field::new("fx", DataType::Float32, false),
51 Field::new("fy", DataType::Float32, false),
52 Field::new("cx", DataType::Float32, false),
53 Field::new("cy", DataType::Float32, false),
54 Field::new("timestamp_ns", DataType::Int64, false),
55 ]))
56 })
57 .clone()
58}
59
60fn list_f32(values: &[f32]) -> ListArray {
61 let inner = Float32Array::from_iter_values(values.iter().copied());
62 let offsets = OffsetBuffer::from_lengths([values.len()]);
63 ListArray::new(
64 Arc::new(Field::new("item", DataType::Float32, false)),
65 offsets,
66 Arc::new(inner),
67 None,
68 )
69}
70
71pub fn to_record_batch(img: &CameraDepth) -> Result<RecordBatch, arrow::error::ArrowError> {
85 let columns: Vec<ArrayRef> = vec![
86 Arc::new(list_f32(img.depths)),
87 Arc::new(Int32Array::from_iter_values(std::iter::once(img.width))),
88 Arc::new(Int32Array::from_iter_values(std::iter::once(img.height))),
89 Arc::new(Float32Array::from_iter_values(std::iter::once(img.fx))),
90 Arc::new(Float32Array::from_iter_values(std::iter::once(img.fy))),
91 Arc::new(Float32Array::from_iter_values(std::iter::once(img.cx))),
92 Arc::new(Float32Array::from_iter_values(std::iter::once(img.cy))),
93 Arc::new(Int64Array::from_iter_values(std::iter::once(
94 img.timestamp_ns,
95 ))),
96 ];
97 RecordBatch::try_new(schema(), columns)
98}
99
100#[allow(missing_docs)]
102pub struct CameraDepthBorrowed<'a> {
103 pub depths: &'a [f32],
104 pub width: i32,
105 pub height: i32,
106 pub fx: f32,
107 pub fy: f32,
108 pub cx: f32,
109 pub cy: f32,
110 pub timestamp_ns: i64,
111}
112
113pub fn from_struct_array_borrowed(
115 array: &StructArray,
116) -> Result<CameraDepthBorrowed<'_>, arrow::error::ArrowError> {
117 if array.is_empty() {
118 return Err(arrow::error::ArrowError::InvalidArgumentError(
119 "camera_depth struct array is empty".into(),
120 ));
121 }
122 let depths_list = array
123 .column(0)
124 .as_any()
125 .downcast_ref::<ListArray>()
126 .ok_or_else(|| {
127 arrow::error::ArrowError::SchemaError("camera_depth 'depths' not ListArray".into())
128 })?;
129 let depths = depths_list
130 .values()
131 .as_any()
132 .downcast_ref::<Float32Array>()
133 .ok_or_else(|| {
134 arrow::error::ArrowError::SchemaError("camera_depth 'depths' inner not Float32".into())
135 })?
136 .values();
137 let i32_at = |idx: usize, name: &str| -> Result<i32, arrow::error::ArrowError> {
138 array
139 .column(idx)
140 .as_any()
141 .downcast_ref::<Int32Array>()
142 .ok_or_else(|| {
143 arrow::error::ArrowError::SchemaError(format!("camera_depth '{name}' not Int32"))
144 })
145 .map(|a| a.value(0))
146 };
147 let f32_at = |idx: usize, name: &str| -> Result<f32, arrow::error::ArrowError> {
148 array
149 .column(idx)
150 .as_any()
151 .downcast_ref::<Float32Array>()
152 .ok_or_else(|| {
153 arrow::error::ArrowError::SchemaError(format!("camera_depth '{name}' not Float32"))
154 })
155 .map(|a| a.value(0))
156 };
157 Ok(CameraDepthBorrowed {
158 depths,
159 width: i32_at(1, "width")?,
160 height: i32_at(2, "height")?,
161 fx: f32_at(3, "fx")?,
162 fy: f32_at(4, "fy")?,
163 cx: f32_at(5, "cx")?,
164 cy: f32_at(6, "cy")?,
165 timestamp_ns: array
166 .column(7)
167 .as_any()
168 .downcast_ref::<Int64Array>()
169 .ok_or_else(|| {
170 arrow::error::ArrowError::SchemaError(
171 "camera_depth 'timestamp_ns' not Int64".into(),
172 )
173 })?
174 .value(0),
175 })
176}
177
178pub fn from_struct_array(
195 array: &StructArray,
196) -> Result<CameraDepthOwned, arrow::error::ArrowError> {
197 if array.is_empty() {
198 return Err(arrow::error::ArrowError::InvalidArgumentError(
199 "camera_depth struct array is empty".into(),
200 ));
201 }
202 let depths_list = array
203 .column(0)
204 .as_any()
205 .downcast_ref::<ListArray>()
206 .ok_or_else(|| {
207 arrow::error::ArrowError::SchemaError("camera_depth 'depths' not ListArray".into())
208 })?;
209 let depths = depths_list
210 .values()
211 .as_any()
212 .downcast_ref::<Float32Array>()
213 .ok_or_else(|| {
214 arrow::error::ArrowError::SchemaError("camera_depth 'depths' inner not Float32".into())
215 })?
216 .values()
217 .to_vec();
218 let i32_at = |idx: usize, name: &str| -> Result<i32, arrow::error::ArrowError> {
219 array
220 .column(idx)
221 .as_any()
222 .downcast_ref::<Int32Array>()
223 .ok_or_else(|| {
224 arrow::error::ArrowError::SchemaError(format!("camera_depth '{name}' not Int32"))
225 })
226 .map(|a| a.value(0))
227 };
228 let f32_at = |idx: usize, name: &str| -> Result<f32, arrow::error::ArrowError> {
229 array
230 .column(idx)
231 .as_any()
232 .downcast_ref::<Float32Array>()
233 .ok_or_else(|| {
234 arrow::error::ArrowError::SchemaError(format!("camera_depth '{name}' not Float32"))
235 })
236 .map(|a| a.value(0))
237 };
238 Ok(CameraDepthOwned {
239 depths,
240 width: i32_at(1, "width")?,
241 height: i32_at(2, "height")?,
242 fx: f32_at(3, "fx")?,
243 fy: f32_at(4, "fy")?,
244 cx: f32_at(5, "cx")?,
245 cy: f32_at(6, "cy")?,
246 timestamp_ns: array
247 .column(7)
248 .as_any()
249 .downcast_ref::<Int64Array>()
250 .ok_or_else(|| {
251 arrow::error::ArrowError::SchemaError(
252 "camera_depth 'timestamp_ns' not Int64".into(),
253 )
254 })?
255 .value(0),
256 })
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262 use arrow::array::Array;
263
264 #[test]
265 fn round_trips_through_record_batch() {
266 let depths = vec![0.5_f32; 4]; let img = CameraDepth {
268 depths: &depths,
269 width: 2,
270 height: 2,
271 fx: 100.0,
272 fy: 100.0,
273 cx: 1.0,
274 cy: 1.0,
275 timestamp_ns: 99,
276 };
277 let batch = to_record_batch(&img).expect("convert");
278 assert_eq!(batch.num_rows(), 1);
279 assert_eq!(batch.num_columns(), 8);
280
281 let depths_col = batch
282 .column(0)
283 .as_any()
284 .downcast_ref::<ListArray>()
285 .expect("depths is ListArray");
286 let inner = depths_col
287 .values()
288 .as_any()
289 .downcast_ref::<Float32Array>()
290 .expect("inner is Float32Array");
291 assert_eq!(inner.len(), 4);
292
293 let ts = batch
294 .column(7)
295 .as_any()
296 .downcast_ref::<Int64Array>()
297 .expect("timestamp_ns is Int64");
298 assert_eq!(ts.value(0), 99);
299 }
300
301 #[test]
302 fn from_struct_array_round_trips() {
303 let depths = vec![0.5_f32, 1.0, 1.5, 2.0];
304 let img = CameraDepth {
305 depths: &depths,
306 width: 2,
307 height: 2,
308 fx: 100.0,
309 fy: 100.0,
310 cx: 1.0,
311 cy: 1.0,
312 timestamp_ns: 99,
313 };
314 let batch = to_record_batch(&img).expect("to");
315 let array = StructArray::from(batch);
316 let owned = from_struct_array(&array).expect("from");
317 assert_eq!(owned.depths, depths);
318 assert_eq!(owned.width, 2);
319 assert_eq!(owned.timestamp_ns, 99);
320 }
321}