isaac-sim-arrow 0.1.0

Apache Arrow schema and decoders for Isaac Sim sensor and actuator records.
Documentation
// SPDX-License-Identifier: MPL-2.0
//! Arrow encoder and decoder for the 3D RTX LiDAR PointCloud channel.
use std::sync::{Arc, OnceLock};

use arrow::array::{Array, ArrayRef, Float32Array, Int32Array, ListArray, StructArray};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;

/// Borrowed view of a single 3D LiDAR PointCloud frame, used as input to [`to_record_batch`].
#[allow(missing_docs)]
pub struct LidarPointCloud<'a> {
    pub points: &'a [f32],
    pub num_points: i32,
    pub width: i32,
    pub height: i32,
}

/// Owned variant returned by [`from_struct_array`]. Holds heap-owned
/// payload so a downstream node can keep the decoded value across the next event.
#[derive(Debug, Clone, PartialEq)]
#[allow(missing_docs)]
pub struct LidarPointCloudOwned {
    pub points: Vec<f32>,
    pub num_points: i32,
    pub width: i32,
    pub height: i32,
}

/// Stable Arrow schema for a `LidarPointCloud` record batch.
pub fn schema() -> SchemaRef {
    static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
    SCHEMA
        .get_or_init(|| {
            Arc::new(Schema::new(vec![
                Field::new(
                    "points",
                    DataType::List(Arc::new(Field::new("item", DataType::Float32, false))),
                    false,
                ),
                Field::new("num_points", DataType::Int32, false),
                Field::new("width", DataType::Int32, false),
                Field::new("height", DataType::Int32, false),
            ]))
        })
        .clone()
}

fn list_f32(values: &[f32]) -> ListArray {
    let inner = Float32Array::from_iter_values(values.iter().copied());
    let offsets = OffsetBuffer::from_lengths([values.len()]);
    ListArray::new(
        Arc::new(Field::new("item", DataType::Float32, false)),
        offsets,
        Arc::new(inner),
        None,
    )
}

/// Encode a `LidarPointCloud` frame as a single-row `RecordBatch` matching [`schema`].
/// The flat `[x, y, z, ...]` points buffer is stored as an Arrow `List<Float32>` column.
///
/// # Example
///
/// ```
/// use isaac_sim_arrow::lidar::pointcloud::{LidarPointCloud, to_record_batch};
/// let points = [1.0_f32, 0.0, 0.0, 0.0, 1.0, 0.0];
/// let pc = LidarPointCloud { points: &points, num_points: 2, width: 2, height: 1 };
/// let batch = to_record_batch(&pc).unwrap();
/// assert_eq!(batch.num_rows(), 1);
/// assert_eq!(batch.num_columns(), 4);
/// ```
pub fn to_record_batch(pc: &LidarPointCloud) -> Result<RecordBatch, arrow::error::ArrowError> {
    let columns: Vec<ArrayRef> = vec![
        Arc::new(list_f32(pc.points)),
        Arc::new(Int32Array::from_iter_values(std::iter::once(pc.num_points))),
        Arc::new(Int32Array::from_iter_values(std::iter::once(pc.width))),
        Arc::new(Int32Array::from_iter_values(std::iter::once(pc.height))),
    ];
    RecordBatch::try_new(schema(), columns)
}

/// Zero-copy variant returned by [`from_struct_array_borrowed`]; the
/// `points` slice borrows directly from the Arrow buffer.
#[allow(missing_docs)]
pub struct LidarPointCloudBorrowed<'a> {
    pub points: &'a [f32],
    pub num_points: i32,
    pub width: i32,
    pub height: i32,
}

/// Decode the first row of a `StructArray` into a borrowed view (zero-copy points slice).
/// The returned struct borrows from `array`; use [`from_struct_array`] for an owned copy.
pub fn from_struct_array_borrowed(
    array: &StructArray,
) -> Result<LidarPointCloudBorrowed<'_>, arrow::error::ArrowError> {
    if array.is_empty() {
        return Err(arrow::error::ArrowError::InvalidArgumentError(
            "lidar_pointcloud struct array is empty".into(),
        ));
    }
    let pts_list = array
        .column(0)
        .as_any()
        .downcast_ref::<ListArray>()
        .ok_or_else(|| {
            arrow::error::ArrowError::SchemaError("pointcloud 'points' not ListArray".into())
        })?;
    let points = pts_list
        .values()
        .as_any()
        .downcast_ref::<Float32Array>()
        .ok_or_else(|| {
            arrow::error::ArrowError::SchemaError("pointcloud 'points' inner not Float32".into())
        })?
        .values();
    let i32_at = |idx: usize, name: &str| -> Result<i32, arrow::error::ArrowError> {
        array
            .column(idx)
            .as_any()
            .downcast_ref::<Int32Array>()
            .ok_or_else(|| {
                arrow::error::ArrowError::SchemaError(format!("pointcloud '{name}' not Int32"))
            })
            .map(|a| a.value(0))
    };
    Ok(LidarPointCloudBorrowed {
        points,
        num_points: i32_at(1, "num_points")?,
        width: i32_at(2, "width")?,
        height: i32_at(3, "height")?,
    })
}

/// Decode the first row of a `StructArray` into a heap-owned `LidarPointCloudOwned`.
///
/// # Example
///
/// ```
/// use arrow::array::StructArray;
/// use isaac_sim_arrow::lidar::pointcloud::{LidarPointCloud, to_record_batch, from_struct_array};
/// let points = [1.0_f32, 0.0, 0.0];
/// let pc = LidarPointCloud { points: &points, num_points: 1, width: 1, height: 1 };
/// let batch = to_record_batch(&pc).unwrap();
/// let array = StructArray::from(batch);
/// let owned = from_struct_array(&array).unwrap();
/// assert_eq!(owned.num_points, 1);
/// assert_eq!(&owned.points, &[1.0_f32, 0.0, 0.0]);
/// ```
pub fn from_struct_array(
    array: &StructArray,
) -> Result<LidarPointCloudOwned, arrow::error::ArrowError> {
    if array.is_empty() {
        return Err(arrow::error::ArrowError::InvalidArgumentError(
            "lidar_pointcloud struct array is empty".into(),
        ));
    }
    let pts_list = array
        .column(0)
        .as_any()
        .downcast_ref::<ListArray>()
        .ok_or_else(|| {
            arrow::error::ArrowError::SchemaError("pointcloud 'points' not ListArray".into())
        })?;
    let pts_inner = pts_list
        .values()
        .as_any()
        .downcast_ref::<Float32Array>()
        .ok_or_else(|| {
            arrow::error::ArrowError::SchemaError("pointcloud 'points' inner not Float32".into())
        })?;
    let i32_at = |idx: usize, name: &str| -> Result<i32, arrow::error::ArrowError> {
        array
            .column(idx)
            .as_any()
            .downcast_ref::<Int32Array>()
            .ok_or_else(|| {
                arrow::error::ArrowError::SchemaError(format!("pointcloud '{name}' not Int32"))
            })
            .map(|a| a.value(0))
    };
    Ok(LidarPointCloudOwned {
        points: pts_inner.values().to_vec(),
        num_points: i32_at(1, "num_points")?,
        width: i32_at(2, "width")?,
        height: i32_at(3, "height")?,
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn round_trips_through_record_batch() {
        let points = [
            1.0_f32, 0.0, 0.0, //
            0.0, 1.0, 0.0, //
            0.0, 0.0, 1.0,
        ];
        let pc = LidarPointCloud {
            points: &points,
            num_points: 3,
            width: 3,
            height: 1,
        };
        let batch = to_record_batch(&pc).expect("convert");
        assert_eq!(batch.num_rows(), 1);
        assert_eq!(batch.num_columns(), 4);

        let pts_col = batch
            .column(0)
            .as_any()
            .downcast_ref::<ListArray>()
            .expect("points is ListArray");
        let inner = pts_col
            .values()
            .as_any()
            .downcast_ref::<Float32Array>()
            .expect("inner is Float32Array");
        assert_eq!(inner.len(), 9);
        assert_eq!(inner.value(0), 1.0);
        assert_eq!(inner.value(8), 1.0);
    }

    #[test]
    fn from_struct_array_round_trips() {
        let points = [1.0_f32, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0];
        let pc = LidarPointCloud {
            points: &points,
            num_points: 3,
            width: 3,
            height: 1,
        };
        let batch = to_record_batch(&pc).expect("to");
        let array = StructArray::from(batch);
        let owned = from_struct_array(&array).expect("from");
        assert_eq!(owned.points, points);
        assert_eq!(owned.num_points, 3);
    }
}