facett-arrow 0.1.0

facett — Apache Arrow → Scene adapter (RecordBatch of edges → a drawable graph)
Documentation
//! **facett-arrow** — Apache Arrow → facett [`Scene`]. Turn an **edge
//! `RecordBatch`** (Int64 `src`/`dst` ids + optional Utf8 labels) straight into a
//! drawable graph. This is the "Apache Arrow integration" half of facett: any
//! Arrow source — a graph query, a join result — becomes a `Scene` with no
//! per-consumer plumbing.

use arrow_array::{Array, Int64Array, RecordBatch, StringArray};
use facett_graph::{Scene, scene_from_labeled_edges};

/// Build a [`Scene`] from an edge batch. `src`/`dst` must be `Int64`; the
/// optional `src_label`/`dst_label` are `Utf8` (used to colour nodes). Rows with
/// a null endpoint are skipped.
pub fn scene_from_batch(
    batch: &RecordBatch,
    src: &str,
    dst: &str,
    src_label: Option<&str>,
    dst_label: Option<&str>,
) -> Result<Scene, String> {
    let i64c = |n: &str| batch.column_by_name(n).and_then(|c| c.as_any().downcast_ref::<Int64Array>());
    let strc = |n: &str| batch.column_by_name(n).and_then(|c| c.as_any().downcast_ref::<StringArray>());

    let s = i64c(src).ok_or_else(|| format!("no Int64 `{src}` column"))?;
    let d = i64c(dst).ok_or_else(|| format!("no Int64 `{dst}` column"))?;
    let sl = src_label.and_then(strc);
    let dl = dst_label.and_then(strc);

    let rows = (0..batch.num_rows())
        .filter(|&i| !s.is_null(i) && !d.is_null(i))
        .map(|i| {
            (
                s.value(i),
                d.value(i),
                sl.map(|a| a.value(i)).unwrap_or("").to_string(),
                dl.map(|a| a.value(i)).unwrap_or("").to_string(),
            )
        });
    Ok(scene_from_labeled_edges(rows))
}

/// Convenience for the common `src`/`dst`/`sl`/`dl` column convention.
pub fn scene_from_edge_batch(batch: &RecordBatch) -> Result<Scene, String> {
    scene_from_batch(batch, "src", "dst", Some("sl"), Some("dl"))
}

/// Format one arrow cell as a display string (common types; `?` for the rest).
fn cell_string(col: &dyn Array, i: usize) -> String {
    use arrow_array::{
        BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray, UInt32Array, UInt64Array,
    };
    if col.is_null(i) {
        return String::new();
    }
    macro_rules! try_num {
        ($($ty:ty),*) => {{ $(if let Some(a) = col.as_any().downcast_ref::<$ty>() { return a.value(i).to_string(); })* }};
    }
    try_num!(Int64Array, Int32Array, UInt64Array, UInt32Array, Float64Array, Float32Array, BooleanArray);
    if let Some(a) = col.as_any().downcast_ref::<StringArray>() {
        return a.value(i).to_string();
    }
    "?".to_string()
}

/// Turn an Arrow [`RecordBatch`] into a scrollable [`facett_table::Table`] — every
/// column becomes a table column, cells formatted to strings. Pairs with
/// facett-table's virtualised scroll for big batches.
pub fn table_from_batch(batch: &RecordBatch, title: impl Into<String>) -> facett_table::Table {
    let columns: Vec<String> = batch.schema().fields().iter().map(|f| f.name().clone()).collect();
    let mut t = facett_table::Table::new(title, columns);
    for r in 0..batch.num_rows() {
        let row: Vec<String> = (0..batch.num_columns()).map(|c| cell_string(batch.column(c).as_ref(), r)).collect();
        t.push_row(row);
    }
    t
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;

    use arrow_array::{Int64Array, StringArray};
    use arrow_schema::{DataType, Field, Schema};

    #[test]
    fn scene_from_edge_batch_builds_graph() {
        let batch = RecordBatch::try_new(
            Arc::new(Schema::new(vec![
                Field::new("src", DataType::Int64, false),
                Field::new("dst", DataType::Int64, false),
                Field::new("sl", DataType::Utf8, false),
                Field::new("dl", DataType::Utf8, false),
            ])),
            vec![
                Arc::new(Int64Array::from(vec![1, 1])),
                Arc::new(Int64Array::from(vec![2, 3])),
                Arc::new(StringArray::from(vec!["Person", "Person"])),
                Arc::new(StringArray::from(vec!["Company", "Address"])),
            ],
        )
        .unwrap();

        let scene = scene_from_edge_batch(&batch).unwrap();
        assert_eq!(scene.nodes.len(), 3, "1, 2, 3 distinct");
        assert_eq!(scene.edges.len(), 2);
        assert_eq!(scene.nodes[0].label, "Person");
    }

    #[test]
    fn table_from_batch_maps_columns_and_rows() {
        let batch = RecordBatch::try_new(
            Arc::new(Schema::new(vec![
                Field::new("id", DataType::Int64, false),
                Field::new("name", DataType::Utf8, false),
            ])),
            vec![
                Arc::new(Int64Array::from(vec![1, 2])),
                Arc::new(StringArray::from(vec!["knut", "korp"])),
            ],
        )
        .unwrap();
        let t = table_from_batch(&batch, "repos");
        assert_eq!(t.columns, vec!["id".to_string(), "name".into()]);
        assert_eq!(t.rows.len(), 2);
        assert_eq!(t.rows[1], vec!["2".to_string(), "korp".into()]);
    }

    #[test]
    fn missing_id_column_errors() {
        let batch = RecordBatch::try_new(
            Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)])),
            vec![Arc::new(Int64Array::from(vec![1]))],
        )
        .unwrap();
        assert!(scene_from_edge_batch(&batch).is_err());
    }
}