use arrow_array::{Array, Int64Array, RecordBatch, StringArray};
use facett_graph::{Scene, scene_from_labeled_edges};
pub fn scene_from_batch(
batch: &RecordBatch,
src: &str,
dst: &str,
src_label: Option<&str>,
dst_label: Option<&str>,
) -> Result<Scene, String> {
let i64c = |n: &str| batch.column_by_name(n).and_then(|c| c.as_any().downcast_ref::<Int64Array>());
let strc = |n: &str| batch.column_by_name(n).and_then(|c| c.as_any().downcast_ref::<StringArray>());
let s = i64c(src).ok_or_else(|| format!("no Int64 `{src}` column"))?;
let d = i64c(dst).ok_or_else(|| format!("no Int64 `{dst}` column"))?;
let sl = src_label.and_then(strc);
let dl = dst_label.and_then(strc);
let rows = (0..batch.num_rows())
.filter(|&i| !s.is_null(i) && !d.is_null(i))
.map(|i| {
(
s.value(i),
d.value(i),
sl.map(|a| a.value(i)).unwrap_or("").to_string(),
dl.map(|a| a.value(i)).unwrap_or("").to_string(),
)
});
Ok(scene_from_labeled_edges(rows))
}
pub fn scene_from_edge_batch(batch: &RecordBatch) -> Result<Scene, String> {
scene_from_batch(batch, "src", "dst", Some("sl"), Some("dl"))
}
fn cell_string(col: &dyn Array, i: usize) -> String {
use arrow_array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray, UInt32Array, UInt64Array,
};
if col.is_null(i) {
return String::new();
}
macro_rules! try_num {
($($ty:ty),*) => {{ $(if let Some(a) = col.as_any().downcast_ref::<$ty>() { return a.value(i).to_string(); })* }};
}
try_num!(Int64Array, Int32Array, UInt64Array, UInt32Array, Float64Array, Float32Array, BooleanArray);
if let Some(a) = col.as_any().downcast_ref::<StringArray>() {
return a.value(i).to_string();
}
"?".to_string()
}
pub fn table_from_batch(batch: &RecordBatch, title: impl Into<String>) -> facett_table::Table {
let columns: Vec<String> = batch.schema().fields().iter().map(|f| f.name().clone()).collect();
let mut t = facett_table::Table::new(title, columns);
for r in 0..batch.num_rows() {
let row: Vec<String> = (0..batch.num_columns()).map(|c| cell_string(batch.column(c).as_ref(), r)).collect();
t.push_row(row);
}
t
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use arrow_array::{Int64Array, StringArray};
use arrow_schema::{DataType, Field, Schema};
#[test]
fn scene_from_edge_batch_builds_graph() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("src", DataType::Int64, false),
Field::new("dst", DataType::Int64, false),
Field::new("sl", DataType::Utf8, false),
Field::new("dl", DataType::Utf8, false),
])),
vec![
Arc::new(Int64Array::from(vec![1, 1])),
Arc::new(Int64Array::from(vec![2, 3])),
Arc::new(StringArray::from(vec!["Person", "Person"])),
Arc::new(StringArray::from(vec!["Company", "Address"])),
],
)
.unwrap();
let scene = scene_from_edge_batch(&batch).unwrap();
assert_eq!(scene.nodes.len(), 3, "1, 2, 3 distinct");
assert_eq!(scene.edges.len(), 2);
assert_eq!(scene.nodes[0].label, "Person");
}
#[test]
fn table_from_batch_maps_columns_and_rows() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
])),
vec![
Arc::new(Int64Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["knut", "korp"])),
],
)
.unwrap();
let t = table_from_batch(&batch, "repos");
assert_eq!(t.columns, vec!["id".to_string(), "name".into()]);
assert_eq!(t.rows.len(), 2);
assert_eq!(t.rows[1], vec!["2".to_string(), "korp".into()]);
}
#[test]
fn missing_id_column_errors() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)])),
vec![Arc::new(Int64Array::from(vec![1]))],
)
.unwrap();
assert!(scene_from_edge_batch(&batch).is_err());
}
}