1use arrow_array::{Array, Int64Array, RecordBatch, StringArray};
8use facett_graph::{Scene, scene_from_labeled_edges};
9
10pub fn scene_from_batch(
14 batch: &RecordBatch,
15 src: &str,
16 dst: &str,
17 src_label: Option<&str>,
18 dst_label: Option<&str>,
19) -> Result<Scene, String> {
20 let i64c = |n: &str| batch.column_by_name(n).and_then(|c| c.as_any().downcast_ref::<Int64Array>());
21 let strc = |n: &str| batch.column_by_name(n).and_then(|c| c.as_any().downcast_ref::<StringArray>());
22
23 let s = i64c(src).ok_or_else(|| format!("no Int64 `{src}` column"))?;
24 let d = i64c(dst).ok_or_else(|| format!("no Int64 `{dst}` column"))?;
25 let sl = src_label.and_then(strc);
26 let dl = dst_label.and_then(strc);
27
28 let rows = (0..batch.num_rows())
29 .filter(|&i| !s.is_null(i) && !d.is_null(i))
30 .map(|i| {
31 (
32 s.value(i),
33 d.value(i),
34 sl.map(|a| a.value(i)).unwrap_or("").to_string(),
35 dl.map(|a| a.value(i)).unwrap_or("").to_string(),
36 )
37 });
38 Ok(scene_from_labeled_edges(rows))
39}
40
41pub fn scene_from_edge_batch(batch: &RecordBatch) -> Result<Scene, String> {
43 scene_from_batch(batch, "src", "dst", Some("sl"), Some("dl"))
44}
45
46fn cell_string(col: &dyn Array, i: usize) -> String {
48 use arrow_array::{
49 BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray, UInt32Array, UInt64Array,
50 };
51 if col.is_null(i) {
52 return String::new();
53 }
54 macro_rules! try_num {
55 ($($ty:ty),*) => {{ $(if let Some(a) = col.as_any().downcast_ref::<$ty>() { return a.value(i).to_string(); })* }};
56 }
57 try_num!(Int64Array, Int32Array, UInt64Array, UInt32Array, Float64Array, Float32Array, BooleanArray);
58 if let Some(a) = col.as_any().downcast_ref::<StringArray>() {
59 return a.value(i).to_string();
60 }
61 "?".to_string()
62}
63
64pub fn table_from_batch(batch: &RecordBatch, title: impl Into<String>) -> facett_table::Table {
68 let columns: Vec<String> = batch.schema().fields().iter().map(|f| f.name().clone()).collect();
69 let mut t = facett_table::Table::new(title, columns);
70 for r in 0..batch.num_rows() {
71 let row: Vec<String> = (0..batch.num_columns()).map(|c| cell_string(batch.column(c).as_ref(), r)).collect();
72 t.push_row(row);
73 }
74 t
75}
76
77#[cfg(test)]
78mod tests {
79 use super::*;
80 use std::sync::Arc;
81
82 use arrow_array::{Int64Array, StringArray};
83 use arrow_schema::{DataType, Field, Schema};
84
85 #[test]
86 fn scene_from_edge_batch_builds_graph() {
87 let batch = RecordBatch::try_new(
88 Arc::new(Schema::new(vec![
89 Field::new("src", DataType::Int64, false),
90 Field::new("dst", DataType::Int64, false),
91 Field::new("sl", DataType::Utf8, false),
92 Field::new("dl", DataType::Utf8, false),
93 ])),
94 vec![
95 Arc::new(Int64Array::from(vec![1, 1])),
96 Arc::new(Int64Array::from(vec![2, 3])),
97 Arc::new(StringArray::from(vec!["Person", "Person"])),
98 Arc::new(StringArray::from(vec!["Company", "Address"])),
99 ],
100 )
101 .unwrap();
102
103 let scene = scene_from_edge_batch(&batch).unwrap();
104 assert_eq!(scene.nodes.len(), 3, "1, 2, 3 distinct");
105 assert_eq!(scene.edges.len(), 2);
106 assert_eq!(scene.nodes[0].label, "Person");
107 }
108
109 #[test]
110 fn table_from_batch_maps_columns_and_rows() {
111 let batch = RecordBatch::try_new(
112 Arc::new(Schema::new(vec![
113 Field::new("id", DataType::Int64, false),
114 Field::new("name", DataType::Utf8, false),
115 ])),
116 vec![
117 Arc::new(Int64Array::from(vec![1, 2])),
118 Arc::new(StringArray::from(vec!["knut", "korp"])),
119 ],
120 )
121 .unwrap();
122 let t = table_from_batch(&batch, "repos");
123 assert_eq!(t.columns, vec!["id".to_string(), "name".into()]);
124 assert_eq!(t.rows.len(), 2);
125 assert_eq!(t.rows[1], vec!["2".to_string(), "korp".into()]);
126 }
127
128 #[test]
129 fn missing_id_column_errors() {
130 let batch = RecordBatch::try_new(
131 Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)])),
132 vec![Arc::new(Int64Array::from(vec![1]))],
133 )
134 .unwrap();
135 assert!(scene_from_edge_batch(&batch).is_err());
136 }
137}