isaac-sim-arrow 0.1.0

Apache Arrow schema and decoders for Isaac Sim sensor and actuator records.
Documentation
// SPDX-License-Identifier: MPL-2.0
//! Arrow encoder and decoder for the per-pixel depth (float32, metres) camera channel.
use std::sync::{Arc, OnceLock};

use arrow::array::{Array, ArrayRef, Float32Array, Int32Array, Int64Array, ListArray, StructArray};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;

/// Borrowed view of a single depth camera frame, used as input to [`to_record_batch`].
#[allow(missing_docs)]
pub struct CameraDepth<'a> {
    pub depths: &'a [f32],
    pub width: i32,
    pub height: i32,
    pub fx: f32,
    pub fy: f32,
    pub cx: f32,
    pub cy: f32,
    pub timestamp_ns: i64,
}

/// Owned variant returned by [`from_struct_array`].
#[derive(Debug, Clone, PartialEq)]
#[allow(missing_docs)]
pub struct CameraDepthOwned {
    pub depths: Vec<f32>,
    pub width: i32,
    pub height: i32,
    pub fx: f32,
    pub fy: f32,
    pub cx: f32,
    pub cy: f32,
    pub timestamp_ns: i64,
}

/// Stable Arrow schema for a `CameraDepth` record batch.
pub fn schema() -> SchemaRef {
    static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
    SCHEMA
        .get_or_init(|| {
            Arc::new(Schema::new(vec![
                Field::new(
                    "depths",
                    DataType::List(Arc::new(Field::new("item", DataType::Float32, false))),
                    false,
                ),
                Field::new("width", DataType::Int32, false),
                Field::new("height", DataType::Int32, false),
                Field::new("fx", DataType::Float32, false),
                Field::new("fy", DataType::Float32, false),
                Field::new("cx", DataType::Float32, false),
                Field::new("cy", DataType::Float32, false),
                Field::new("timestamp_ns", DataType::Int64, false),
            ]))
        })
        .clone()
}

fn list_f32(values: &[f32]) -> ListArray {
    let inner = Float32Array::from_iter_values(values.iter().copied());
    let offsets = OffsetBuffer::from_lengths([values.len()]);
    ListArray::new(
        Arc::new(Field::new("item", DataType::Float32, false)),
        offsets,
        Arc::new(inner),
        None,
    )
}

/// Encode a `CameraDepth` frame as a single-row `RecordBatch` matching [`schema`].
///
/// # Example
///
/// ```
/// use isaac_sim_arrow::camera::depth::{CameraDepth, to_record_batch};
/// let depths = vec![1.5_f32; 4]; // 2x2
/// let img = CameraDepth { depths: &depths, width: 2, height: 2,
///     fx: 100.0, fy: 100.0, cx: 1.0, cy: 1.0, timestamp_ns: 10 };
/// let batch = to_record_batch(&img).unwrap();
/// assert_eq!(batch.num_rows(), 1);
/// assert_eq!(batch.num_columns(), 8);
/// ```
pub fn to_record_batch(img: &CameraDepth) -> Result<RecordBatch, arrow::error::ArrowError> {
    let columns: Vec<ArrayRef> = vec![
        Arc::new(list_f32(img.depths)),
        Arc::new(Int32Array::from_iter_values(std::iter::once(img.width))),
        Arc::new(Int32Array::from_iter_values(std::iter::once(img.height))),
        Arc::new(Float32Array::from_iter_values(std::iter::once(img.fx))),
        Arc::new(Float32Array::from_iter_values(std::iter::once(img.fy))),
        Arc::new(Float32Array::from_iter_values(std::iter::once(img.cx))),
        Arc::new(Float32Array::from_iter_values(std::iter::once(img.cy))),
        Arc::new(Int64Array::from_iter_values(std::iter::once(
            img.timestamp_ns,
        ))),
    ];
    RecordBatch::try_new(schema(), columns)
}

/// Zero-copy variant returned by [`from_struct_array_borrowed`].
#[allow(missing_docs)]
pub struct CameraDepthBorrowed<'a> {
    pub depths: &'a [f32],
    pub width: i32,
    pub height: i32,
    pub fx: f32,
    pub fy: f32,
    pub cx: f32,
    pub cy: f32,
    pub timestamp_ns: i64,
}

/// Decode the first row of a `StructArray` into a zero-copy borrowed depth view.
pub fn from_struct_array_borrowed(
    array: &StructArray,
) -> Result<CameraDepthBorrowed<'_>, arrow::error::ArrowError> {
    if array.is_empty() {
        return Err(arrow::error::ArrowError::InvalidArgumentError(
            "camera_depth struct array is empty".into(),
        ));
    }
    let depths_list = array
        .column(0)
        .as_any()
        .downcast_ref::<ListArray>()
        .ok_or_else(|| {
            arrow::error::ArrowError::SchemaError("camera_depth 'depths' not ListArray".into())
        })?;
    let depths = depths_list
        .values()
        .as_any()
        .downcast_ref::<Float32Array>()
        .ok_or_else(|| {
            arrow::error::ArrowError::SchemaError("camera_depth 'depths' inner not Float32".into())
        })?
        .values();
    let i32_at = |idx: usize, name: &str| -> Result<i32, arrow::error::ArrowError> {
        array
            .column(idx)
            .as_any()
            .downcast_ref::<Int32Array>()
            .ok_or_else(|| {
                arrow::error::ArrowError::SchemaError(format!("camera_depth '{name}' not Int32"))
            })
            .map(|a| a.value(0))
    };
    let f32_at = |idx: usize, name: &str| -> Result<f32, arrow::error::ArrowError> {
        array
            .column(idx)
            .as_any()
            .downcast_ref::<Float32Array>()
            .ok_or_else(|| {
                arrow::error::ArrowError::SchemaError(format!("camera_depth '{name}' not Float32"))
            })
            .map(|a| a.value(0))
    };
    Ok(CameraDepthBorrowed {
        depths,
        width: i32_at(1, "width")?,
        height: i32_at(2, "height")?,
        fx: f32_at(3, "fx")?,
        fy: f32_at(4, "fy")?,
        cx: f32_at(5, "cx")?,
        cy: f32_at(6, "cy")?,
        timestamp_ns: array
            .column(7)
            .as_any()
            .downcast_ref::<Int64Array>()
            .ok_or_else(|| {
                arrow::error::ArrowError::SchemaError(
                    "camera_depth 'timestamp_ns' not Int64".into(),
                )
            })?
            .value(0),
    })
}

/// Decode the first row of a `StructArray` into a heap-owned `CameraDepthOwned`.
///
/// # Example
///
/// ```
/// use arrow::array::StructArray;
/// use isaac_sim_arrow::camera::depth::{CameraDepth, to_record_batch, from_struct_array};
/// let depths = vec![0.5_f32, 1.0, 1.5, 2.0];
/// let img = CameraDepth { depths: &depths, width: 2, height: 2,
///     fx: 100.0, fy: 100.0, cx: 1.0, cy: 1.0, timestamp_ns: 3 };
/// let batch = to_record_batch(&img).unwrap();
/// let array = StructArray::from(batch);
/// let owned = from_struct_array(&array).unwrap();
/// assert_eq!(owned.depths, depths);
/// assert_eq!(owned.timestamp_ns, 3);
/// ```
pub fn from_struct_array(
    array: &StructArray,
) -> Result<CameraDepthOwned, arrow::error::ArrowError> {
    if array.is_empty() {
        return Err(arrow::error::ArrowError::InvalidArgumentError(
            "camera_depth struct array is empty".into(),
        ));
    }
    let depths_list = array
        .column(0)
        .as_any()
        .downcast_ref::<ListArray>()
        .ok_or_else(|| {
            arrow::error::ArrowError::SchemaError("camera_depth 'depths' not ListArray".into())
        })?;
    let depths = depths_list
        .values()
        .as_any()
        .downcast_ref::<Float32Array>()
        .ok_or_else(|| {
            arrow::error::ArrowError::SchemaError("camera_depth 'depths' inner not Float32".into())
        })?
        .values()
        .to_vec();
    let i32_at = |idx: usize, name: &str| -> Result<i32, arrow::error::ArrowError> {
        array
            .column(idx)
            .as_any()
            .downcast_ref::<Int32Array>()
            .ok_or_else(|| {
                arrow::error::ArrowError::SchemaError(format!("camera_depth '{name}' not Int32"))
            })
            .map(|a| a.value(0))
    };
    let f32_at = |idx: usize, name: &str| -> Result<f32, arrow::error::ArrowError> {
        array
            .column(idx)
            .as_any()
            .downcast_ref::<Float32Array>()
            .ok_or_else(|| {
                arrow::error::ArrowError::SchemaError(format!("camera_depth '{name}' not Float32"))
            })
            .map(|a| a.value(0))
    };
    Ok(CameraDepthOwned {
        depths,
        width: i32_at(1, "width")?,
        height: i32_at(2, "height")?,
        fx: f32_at(3, "fx")?,
        fy: f32_at(4, "fy")?,
        cx: f32_at(5, "cx")?,
        cy: f32_at(6, "cy")?,
        timestamp_ns: array
            .column(7)
            .as_any()
            .downcast_ref::<Int64Array>()
            .ok_or_else(|| {
                arrow::error::ArrowError::SchemaError(
                    "camera_depth 'timestamp_ns' not Int64".into(),
                )
            })?
            .value(0),
    })
}

#[cfg(test)]
mod tests {
    use super::*;
    use arrow::array::Array;

    #[test]
    fn round_trips_through_record_batch() {
        let depths = vec![0.5_f32; 4]; // 2x2
        let img = CameraDepth {
            depths: &depths,
            width: 2,
            height: 2,
            fx: 100.0,
            fy: 100.0,
            cx: 1.0,
            cy: 1.0,
            timestamp_ns: 99,
        };
        let batch = to_record_batch(&img).expect("convert");
        assert_eq!(batch.num_rows(), 1);
        assert_eq!(batch.num_columns(), 8);

        let depths_col = batch
            .column(0)
            .as_any()
            .downcast_ref::<ListArray>()
            .expect("depths is ListArray");
        let inner = depths_col
            .values()
            .as_any()
            .downcast_ref::<Float32Array>()
            .expect("inner is Float32Array");
        assert_eq!(inner.len(), 4);

        let ts = batch
            .column(7)
            .as_any()
            .downcast_ref::<Int64Array>()
            .expect("timestamp_ns is Int64");
        assert_eq!(ts.value(0), 99);
    }

    #[test]
    fn from_struct_array_round_trips() {
        let depths = vec![0.5_f32, 1.0, 1.5, 2.0];
        let img = CameraDepth {
            depths: &depths,
            width: 2,
            height: 2,
            fx: 100.0,
            fy: 100.0,
            cx: 1.0,
            cy: 1.0,
            timestamp_ns: 99,
        };
        let batch = to_record_batch(&img).expect("to");
        let array = StructArray::from(batch);
        let owned = from_struct_array(&array).expect("from");
        assert_eq!(owned.depths, depths);
        assert_eq!(owned.width, 2);
        assert_eq!(owned.timestamp_ns, 99);
    }
}