Skip to main content

facett_arrow/
lib.rs

1//! **facett-arrow** — Apache Arrow → facett [`Scene`]. Turn an **edge
2//! `RecordBatch`** (Int64 `src`/`dst` ids + optional Utf8 labels) straight into a
3//! drawable graph. This is the "Apache Arrow integration" half of facett: any
4//! Arrow source — a graph query, a join result — becomes a `Scene` with no
5//! per-consumer plumbing.
6
7use arrow_array::{Array, Int64Array, RecordBatch, StringArray};
8use facett_graph::{Scene, scene_from_labeled_edges};
9
10/// Build a [`Scene`] from an edge batch. `src`/`dst` must be `Int64`; the
11/// optional `src_label`/`dst_label` are `Utf8` (used to colour nodes). Rows with
12/// a null endpoint are skipped.
13pub fn scene_from_batch(
14    batch: &RecordBatch,
15    src: &str,
16    dst: &str,
17    src_label: Option<&str>,
18    dst_label: Option<&str>,
19) -> Result<Scene, String> {
20    let i64c = |n: &str| batch.column_by_name(n).and_then(|c| c.as_any().downcast_ref::<Int64Array>());
21    let strc = |n: &str| batch.column_by_name(n).and_then(|c| c.as_any().downcast_ref::<StringArray>());
22
23    let s = i64c(src).ok_or_else(|| format!("no Int64 `{src}` column"))?;
24    let d = i64c(dst).ok_or_else(|| format!("no Int64 `{dst}` column"))?;
25    let sl = src_label.and_then(strc);
26    let dl = dst_label.and_then(strc);
27
28    let rows = (0..batch.num_rows())
29        .filter(|&i| !s.is_null(i) && !d.is_null(i))
30        .map(|i| {
31            (
32                s.value(i),
33                d.value(i),
34                sl.map(|a| a.value(i)).unwrap_or("").to_string(),
35                dl.map(|a| a.value(i)).unwrap_or("").to_string(),
36            )
37        });
38    Ok(scene_from_labeled_edges(rows))
39}
40
41/// Convenience for the common `src`/`dst`/`sl`/`dl` column convention.
42pub fn scene_from_edge_batch(batch: &RecordBatch) -> Result<Scene, String> {
43    scene_from_batch(batch, "src", "dst", Some("sl"), Some("dl"))
44}
45
46/// Format one arrow cell as a display string (common types; `?` for the rest).
47fn cell_string(col: &dyn Array, i: usize) -> String {
48    use arrow_array::{
49        BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray, UInt32Array, UInt64Array,
50    };
51    if col.is_null(i) {
52        return String::new();
53    }
54    macro_rules! try_num {
55        ($($ty:ty),*) => {{ $(if let Some(a) = col.as_any().downcast_ref::<$ty>() { return a.value(i).to_string(); })* }};
56    }
57    try_num!(Int64Array, Int32Array, UInt64Array, UInt32Array, Float64Array, Float32Array, BooleanArray);
58    if let Some(a) = col.as_any().downcast_ref::<StringArray>() {
59        return a.value(i).to_string();
60    }
61    "?".to_string()
62}
63
64/// Turn an Arrow [`RecordBatch`] into a scrollable [`facett_table::Table`] — every
65/// column becomes a table column, cells formatted to strings. Pairs with
66/// facett-table's virtualised scroll for big batches.
67pub fn table_from_batch(batch: &RecordBatch, title: impl Into<String>) -> facett_table::Table {
68    let columns: Vec<String> = batch.schema().fields().iter().map(|f| f.name().clone()).collect();
69    let mut t = facett_table::Table::new(title, columns);
70    for r in 0..batch.num_rows() {
71        let row: Vec<String> = (0..batch.num_columns()).map(|c| cell_string(batch.column(c).as_ref(), r)).collect();
72        t.push_row(row);
73    }
74    t
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80    use std::sync::Arc;
81
82    use arrow_array::{Int64Array, StringArray};
83    use arrow_schema::{DataType, Field, Schema};
84
85    #[test]
86    fn scene_from_edge_batch_builds_graph() {
87        let batch = RecordBatch::try_new(
88            Arc::new(Schema::new(vec![
89                Field::new("src", DataType::Int64, false),
90                Field::new("dst", DataType::Int64, false),
91                Field::new("sl", DataType::Utf8, false),
92                Field::new("dl", DataType::Utf8, false),
93            ])),
94            vec![
95                Arc::new(Int64Array::from(vec![1, 1])),
96                Arc::new(Int64Array::from(vec![2, 3])),
97                Arc::new(StringArray::from(vec!["Person", "Person"])),
98                Arc::new(StringArray::from(vec!["Company", "Address"])),
99            ],
100        )
101        .unwrap();
102
103        let scene = scene_from_edge_batch(&batch).unwrap();
104        assert_eq!(scene.nodes.len(), 3, "1, 2, 3 distinct");
105        assert_eq!(scene.edges.len(), 2);
106        assert_eq!(scene.nodes[0].label, "Person");
107    }
108
109    #[test]
110    fn table_from_batch_maps_columns_and_rows() {
111        let batch = RecordBatch::try_new(
112            Arc::new(Schema::new(vec![
113                Field::new("id", DataType::Int64, false),
114                Field::new("name", DataType::Utf8, false),
115            ])),
116            vec![
117                Arc::new(Int64Array::from(vec![1, 2])),
118                Arc::new(StringArray::from(vec!["knut", "korp"])),
119            ],
120        )
121        .unwrap();
122        let t = table_from_batch(&batch, "repos");
123        assert_eq!(t.columns, vec!["id".to_string(), "name".into()]);
124        assert_eq!(t.rows.len(), 2);
125        assert_eq!(t.rows[1], vec!["2".to_string(), "korp".into()]);
126    }
127
128    #[test]
129    fn missing_id_column_errors() {
130        let batch = RecordBatch::try_new(
131            Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)])),
132            vec![Arc::new(Int64Array::from(vec![1]))],
133        )
134        .unwrap();
135        assert!(scene_from_edge_batch(&batch).is_err());
136    }
137}