use std::collections::HashMap;
use std::fs::{self, File};
use std::io::{BufWriter, Write};
use std::path::Path;
use chrono::NaiveDate;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::models::hypergraph::{
CrossLayerEdge, Hyperedge, HyperedgeParticipant, Hypergraph, HypergraphMetadata,
HypergraphNode, NodeBudgetReport,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RawUnifiedNode {
pub id: String,
pub node_type: String,
pub entity_type_code: u32,
pub layer: u8,
pub external_id: String,
pub name: String,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<String, Value>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub features: Vec<f64>,
#[serde(default)]
pub is_anomaly: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub anomaly_type: Option<String>,
#[serde(default)]
pub is_aggregate: bool,
#[serde(default)]
pub aggregate_count: usize,
}
impl RawUnifiedNode {
pub fn from_hypergraph_node(node: &HypergraphNode) -> Self {
Self {
id: node.id.clone(),
node_type: node.entity_type.clone(),
entity_type_code: node.entity_type_code,
layer: node.layer.index(),
external_id: node.external_id.clone(),
name: node.label.clone(),
properties: node.properties.clone(),
features: node.features.clone(),
is_anomaly: node.is_anomaly,
anomaly_type: node.anomaly_type.clone(),
is_aggregate: node.is_aggregate,
aggregate_count: node.aggregate_count,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RawUnifiedEdge {
pub source: String,
pub target: String,
pub source_layer: u8,
pub target_layer: u8,
pub edge_type: String,
pub edge_type_code: u32,
pub weight: f32,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<String, Value>,
}
impl RawUnifiedEdge {
pub fn from_cross_layer_edge(edge: &CrossLayerEdge) -> Self {
Self {
source: edge.source_id.clone(),
target: edge.target_id.clone(),
source_layer: edge.source_layer.index(),
target_layer: edge.target_layer.index(),
edge_type: edge.edge_type.clone(),
edge_type_code: edge.edge_type_code,
weight: 1.0,
properties: edge.properties.clone(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RawUnifiedHyperedge {
pub id: String,
pub hyperedge_type: String,
pub subtype: String,
pub member_ids: Vec<String>,
pub layer: u8,
pub participants: Vec<HyperedgeParticipant>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<String, Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<NaiveDate>,
#[serde(default)]
pub is_anomaly: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub anomaly_type: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub features: Vec<f64>,
}
impl RawUnifiedHyperedge {
pub fn from_hyperedge(he: &Hyperedge) -> Self {
Self {
id: he.id.clone(),
hyperedge_type: he.hyperedge_type.clone(),
subtype: he.subtype.clone(),
member_ids: he.participants.iter().map(|p| p.node_id.clone()).collect(),
layer: he.layer.index(),
participants: he.participants.clone(),
properties: he.properties.clone(),
timestamp: he.timestamp,
is_anomaly: he.is_anomaly,
anomaly_type: he.anomaly_type.clone(),
features: he.features.clone(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UnifiedHypergraphMetadata {
pub format: String,
pub name: String,
pub num_nodes: usize,
pub num_edges: usize,
pub num_hyperedges: usize,
pub layer_node_counts: HashMap<String, usize>,
pub node_type_counts: HashMap<String, usize>,
pub edge_type_counts: HashMap<String, usize>,
pub hyperedge_type_counts: HashMap<String, usize>,
pub anomalous_nodes: usize,
pub anomalous_hyperedges: usize,
pub source: String,
pub generated_at: String,
pub budget_report: NodeBudgetReport,
pub files: Vec<String>,
}
impl UnifiedHypergraphMetadata {
pub fn from_metadata(meta: &HypergraphMetadata) -> Self {
Self {
format: "rustgraph_unified_v1".to_string(),
name: meta.name.clone(),
num_nodes: meta.num_nodes,
num_edges: meta.num_edges,
num_hyperedges: meta.num_hyperedges,
layer_node_counts: meta.layer_node_counts.clone(),
node_type_counts: meta.node_type_counts.clone(),
edge_type_counts: meta.edge_type_counts.clone(),
hyperedge_type_counts: meta.hyperedge_type_counts.clone(),
anomalous_nodes: meta.anomalous_nodes,
anomalous_hyperedges: meta.anomalous_hyperedges,
source: meta.source.clone(),
generated_at: meta.generated_at.clone(),
budget_report: meta.budget_report.clone(),
files: meta.files.clone(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct UnifiedExportConfig {
pub pretty_print: bool,
}
pub struct RustGraphUnifiedExporter {
config: UnifiedExportConfig,
}
impl RustGraphUnifiedExporter {
pub fn new(config: UnifiedExportConfig) -> Self {
Self { config }
}
pub fn export(
&self,
hypergraph: &Hypergraph,
output_dir: &Path,
) -> std::io::Result<UnifiedHypergraphMetadata> {
fs::create_dir_all(output_dir)?;
let nodes_path = output_dir.join("nodes.jsonl");
let file = File::create(nodes_path)?;
let mut writer = BufWriter::with_capacity(256 * 1024, file);
for node in &hypergraph.nodes {
let unified = RawUnifiedNode::from_hypergraph_node(node);
serde_json::to_writer(&mut writer, &unified)?;
writeln!(writer)?;
}
writer.flush()?;
let edges_path = output_dir.join("edges.jsonl");
let file = File::create(edges_path)?;
let mut writer = BufWriter::with_capacity(256 * 1024, file);
for edge in &hypergraph.edges {
let unified = RawUnifiedEdge::from_cross_layer_edge(edge);
serde_json::to_writer(&mut writer, &unified)?;
writeln!(writer)?;
}
writer.flush()?;
let hyperedges_path = output_dir.join("hyperedges.jsonl");
let file = File::create(hyperedges_path)?;
let mut writer = BufWriter::with_capacity(256 * 1024, file);
for he in &hypergraph.hyperedges {
let unified = RawUnifiedHyperedge::from_hyperedge(he);
serde_json::to_writer(&mut writer, &unified)?;
writeln!(writer)?;
}
writer.flush()?;
let mut metadata = UnifiedHypergraphMetadata::from_metadata(&hypergraph.metadata);
metadata.files = vec![
"nodes.jsonl".to_string(),
"edges.jsonl".to_string(),
"hyperedges.jsonl".to_string(),
"metadata.json".to_string(),
];
let metadata_path = output_dir.join("metadata.json");
let file = File::create(metadata_path)?;
if self.config.pretty_print {
serde_json::to_writer_pretty(file, &metadata)?;
} else {
serde_json::to_writer(file, &metadata)?;
}
Ok(metadata)
}
pub fn export_to_writer<W: Write>(
&self,
hypergraph: &Hypergraph,
writer: &mut W,
) -> std::io::Result<UnifiedHypergraphMetadata> {
for node in &hypergraph.nodes {
let unified = RawUnifiedNode::from_hypergraph_node(node);
let mut obj = serde_json::to_value(&unified)?;
obj.as_object_mut()
.expect("serialized struct is always a JSON object")
.insert("_type".to_string(), Value::String("node".to_string()));
serde_json::to_writer(&mut *writer, &obj)?;
writeln!(writer)?;
}
for edge in &hypergraph.edges {
let unified = RawUnifiedEdge::from_cross_layer_edge(edge);
let mut obj = serde_json::to_value(&unified)?;
obj.as_object_mut()
.expect("serialized struct is always a JSON object")
.insert("_type".to_string(), Value::String("edge".to_string()));
serde_json::to_writer(&mut *writer, &obj)?;
writeln!(writer)?;
}
for he in &hypergraph.hyperedges {
let unified = RawUnifiedHyperedge::from_hyperedge(he);
let mut obj = serde_json::to_value(&unified)?;
obj.as_object_mut()
.expect("serialized struct is always a JSON object")
.insert("_type".to_string(), Value::String("hyperedge".to_string()));
serde_json::to_writer(&mut *writer, &obj)?;
writeln!(writer)?;
}
let mut metadata = UnifiedHypergraphMetadata::from_metadata(&hypergraph.metadata);
metadata.files = vec![];
Ok(metadata)
}
}
#[cfg(feature = "rustgraph")]
mod bulk_export {
use super::*;
use rustgraph_api_types::bulk::{BulkEdgeData, BulkNodeData};
#[derive(Debug, Clone)]
pub struct RustGraphBulkExport {
pub nodes: Vec<BulkNodeData>,
pub edges: Vec<BulkEdgeData>,
pub hyperedges: Vec<RawUnifiedHyperedge>,
pub id_map: HashMap<String, u64>,
}
impl RustGraphUnifiedExporter {
pub fn to_bulk_import(&self, hypergraph: &Hypergraph) -> RustGraphBulkExport {
let mut id_map: HashMap<String, u64> = HashMap::with_capacity(hypergraph.nodes.len());
let mut nodes = Vec::with_capacity(hypergraph.nodes.len());
for (idx, hg_node) in hypergraph.nodes.iter().enumerate() {
let id = (idx as u64) + 1; id_map.insert(hg_node.id.clone(), id);
let mut properties = hg_node.properties.clone();
properties.insert("entity_id".to_string(), Value::String(hg_node.id.clone()));
properties.insert(
"node_type_name".to_string(),
Value::String(hg_node.entity_type.clone()),
);
properties.insert(
"external_id".to_string(),
Value::String(hg_node.external_id.clone()),
);
if hg_node.is_anomaly {
properties.insert("is_anomaly".to_string(), Value::Bool(true));
if let Some(ref at) = hg_node.anomaly_type {
properties.insert("anomaly_type".to_string(), Value::String(at.clone()));
}
}
if !hg_node.features.is_empty() {
properties.insert(
"features".to_string(),
Value::Array(
hg_node
.features
.iter()
.map(|f| serde_json::json!(f))
.collect(),
),
);
}
nodes.push(BulkNodeData {
id: Some(id),
node_type: hg_node.entity_type_code,
layer: Some(hg_node.layer.index()),
labels: vec![hg_node.label.clone()],
properties,
});
}
let mut edges = Vec::with_capacity(hypergraph.edges.len());
for edge in &hypergraph.edges {
let source = match id_map.get(&edge.source_id) {
Some(&id) => id,
None => continue, };
let target = match id_map.get(&edge.target_id) {
Some(&id) => id,
None => continue, };
let mut properties = edge.properties.clone();
properties.insert(
"edge_type_name".to_string(),
Value::String(edge.edge_type.clone()),
);
edges.push(BulkEdgeData {
source,
target,
edge_type: edge.edge_type_code,
weight: 1.0,
properties,
});
}
let hyperedges: Vec<RawUnifiedHyperedge> = hypergraph
.hyperedges
.iter()
.map(RawUnifiedHyperedge::from_hyperedge)
.collect();
RustGraphBulkExport {
nodes,
edges,
hyperedges,
id_map,
}
}
}
}
#[cfg(feature = "rustgraph")]
pub use bulk_export::RustGraphBulkExport;
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::builders::hypergraph::{HypergraphBuilder, HypergraphConfig};
use crate::models::hypergraph::HypergraphLayer;
use tempfile::tempdir;
fn build_test_hypergraph() -> Hypergraph {
let config = HypergraphConfig {
max_nodes: 1000,
include_p2p: false,
include_o2c: false,
include_vendors: false,
include_customers: false,
include_employees: false,
..Default::default()
};
let mut builder = HypergraphBuilder::new(config);
builder.add_coso_framework();
builder.build()
}
#[test]
fn test_node_conversion() {
let node = HypergraphNode {
id: "node_1".to_string(),
entity_type: "Account".to_string(),
entity_type_code: 100,
layer: HypergraphLayer::AccountingNetwork,
external_id: "1000".to_string(),
label: "Cash".to_string(),
properties: HashMap::new(),
features: vec![1.0, 2.0],
is_anomaly: false,
anomaly_type: None,
is_aggregate: false,
aggregate_count: 0,
};
let unified = RawUnifiedNode::from_hypergraph_node(&node);
assert_eq!(unified.id, "node_1");
assert_eq!(unified.node_type, "Account");
assert_eq!(unified.name, "Cash");
assert_eq!(unified.layer, 3); assert_eq!(unified.entity_type_code, 100);
assert_eq!(unified.external_id, "1000");
assert_eq!(unified.features, vec![1.0, 2.0]);
}
#[test]
fn test_edge_conversion() {
let edge = CrossLayerEdge {
source_id: "ctrl_C001".to_string(),
source_layer: HypergraphLayer::GovernanceControls,
target_id: "acct_1000".to_string(),
target_layer: HypergraphLayer::AccountingNetwork,
edge_type: "ImplementsControl".to_string(),
edge_type_code: 40,
properties: HashMap::new(),
};
let unified = RawUnifiedEdge::from_cross_layer_edge(&edge);
assert_eq!(unified.source, "ctrl_C001");
assert_eq!(unified.target, "acct_1000");
assert_eq!(unified.source_layer, 1); assert_eq!(unified.target_layer, 3); assert_eq!(unified.edge_type, "ImplementsControl");
assert_eq!(unified.edge_type_code, 40);
assert_eq!(unified.weight, 1.0);
}
#[test]
fn test_hyperedge_conversion() {
let he = Hyperedge {
id: "he_1".to_string(),
hyperedge_type: "JournalEntry".to_string(),
subtype: "R2R".to_string(),
participants: vec![
HyperedgeParticipant {
node_id: "acct_1000".to_string(),
role: "debit".to_string(),
weight: Some(500.0),
},
HyperedgeParticipant {
node_id: "acct_2000".to_string(),
role: "credit".to_string(),
weight: Some(500.0),
},
],
layer: HypergraphLayer::AccountingNetwork,
properties: HashMap::new(),
timestamp: Some(NaiveDate::from_ymd_opt(2024, 6, 15).unwrap()),
is_anomaly: true,
anomaly_type: Some("split_transaction".to_string()),
features: vec![6.2, 1.0],
};
let unified = RawUnifiedHyperedge::from_hyperedge(&he);
assert_eq!(unified.id, "he_1");
assert_eq!(unified.hyperedge_type, "JournalEntry");
assert_eq!(unified.layer, 3); assert_eq!(unified.member_ids, vec!["acct_1000", "acct_2000"]);
assert_eq!(unified.participants.len(), 2);
assert!(unified.is_anomaly);
assert_eq!(unified.anomaly_type, Some("split_transaction".to_string()));
}
#[test]
fn test_unified_export_creates_all_files() {
let hypergraph = build_test_hypergraph();
let dir = tempdir().unwrap();
let exporter = RustGraphUnifiedExporter::new(UnifiedExportConfig::default());
let metadata = exporter.export(&hypergraph, dir.path()).unwrap();
assert!(dir.path().join("nodes.jsonl").exists());
assert!(dir.path().join("edges.jsonl").exists());
assert!(dir.path().join("hyperedges.jsonl").exists());
assert!(dir.path().join("metadata.json").exists());
assert_eq!(metadata.num_nodes, 22); assert_eq!(metadata.format, "rustgraph_unified_v1");
}
#[test]
fn test_unified_nodes_jsonl_parseable() {
let hypergraph = build_test_hypergraph();
let dir = tempdir().unwrap();
let exporter = RustGraphUnifiedExporter::new(UnifiedExportConfig::default());
exporter.export(&hypergraph, dir.path()).unwrap();
let content = std::fs::read_to_string(dir.path().join("nodes.jsonl")).unwrap();
let mut count = 0;
for line in content.lines() {
let node: RawUnifiedNode = serde_json::from_str(line).unwrap();
assert!(!node.id.is_empty());
assert!(!node.node_type.is_empty());
assert!(!node.name.is_empty());
assert!(node.layer >= 1 && node.layer <= 3);
count += 1;
}
assert_eq!(count, 22);
}
#[test]
fn test_unified_edges_jsonl_uses_source_target() {
let hypergraph = build_test_hypergraph();
let dir = tempdir().unwrap();
let exporter = RustGraphUnifiedExporter::new(UnifiedExportConfig::default());
exporter.export(&hypergraph, dir.path()).unwrap();
let content = std::fs::read_to_string(dir.path().join("edges.jsonl")).unwrap();
for line in content.lines() {
let edge: RawUnifiedEdge = serde_json::from_str(line).unwrap();
assert!(!edge.source.is_empty());
assert!(!edge.target.is_empty());
assert!(edge.source_layer >= 1 && edge.source_layer <= 3);
assert!(edge.target_layer >= 1 && edge.target_layer <= 3);
assert_eq!(edge.weight, 1.0);
}
}
#[test]
fn test_unified_metadata_has_format_field() {
let hypergraph = build_test_hypergraph();
let dir = tempdir().unwrap();
let exporter = RustGraphUnifiedExporter::new(UnifiedExportConfig { pretty_print: true });
exporter.export(&hypergraph, dir.path()).unwrap();
let content = std::fs::read_to_string(dir.path().join("metadata.json")).unwrap();
let metadata: UnifiedHypergraphMetadata = serde_json::from_str(&content).unwrap();
assert_eq!(metadata.format, "rustgraph_unified_v1");
assert_eq!(metadata.source, "datasynth");
}
#[cfg(feature = "rustgraph")]
#[test]
fn test_to_bulk_import_nodes() {
let hypergraph = build_test_hypergraph();
let exporter = RustGraphUnifiedExporter::new(UnifiedExportConfig::default());
let export = exporter.to_bulk_import(&hypergraph);
assert_eq!(export.nodes.len(), 22); assert_eq!(export.id_map.len(), 22);
let first = &export.nodes[0];
assert_eq!(first.id, Some(1)); assert!(first.node_type > 0); assert!(first.layer.is_some());
assert!(!first.labels.is_empty());
assert!(first.properties.contains_key("entity_id"));
assert!(first.properties.contains_key("node_type_name"));
}
#[cfg(feature = "rustgraph")]
#[test]
fn test_to_bulk_import_edges() {
let hypergraph = build_test_hypergraph();
let exporter = RustGraphUnifiedExporter::new(UnifiedExportConfig::default());
let export = exporter.to_bulk_import(&hypergraph);
assert!(!export.edges.is_empty());
for edge in &export.edges {
assert!(export.id_map.values().any(|&id| id == edge.source));
assert!(export.id_map.values().any(|&id| id == edge.target));
assert!(edge.properties.contains_key("edge_type_name"));
}
}
#[cfg(feature = "rustgraph")]
#[test]
fn test_to_bulk_import_id_mapping() {
let hypergraph = build_test_hypergraph();
let exporter = RustGraphUnifiedExporter::new(UnifiedExportConfig::default());
let export = exporter.to_bulk_import(&hypergraph);
let mut ids: Vec<u64> = export.nodes.iter().filter_map(|n| n.id).collect();
ids.sort();
assert_eq!(ids.first(), Some(&1u64));
assert_eq!(ids.last(), Some(&(export.nodes.len() as u64)));
for node in &export.nodes {
let string_id = node
.properties
.get("entity_id")
.and_then(|v| v.as_str())
.expect("entity_id should be a string");
assert_eq!(export.id_map.get(string_id).copied(), node.id);
}
}
#[test]
fn test_export_to_writer() {
let hypergraph = build_test_hypergraph();
let mut buffer = Vec::new();
let exporter = RustGraphUnifiedExporter::new(UnifiedExportConfig::default());
let metadata = exporter.export_to_writer(&hypergraph, &mut buffer).unwrap();
assert_eq!(metadata.num_nodes, 22);
let content = String::from_utf8(buffer).unwrap();
let mut node_count = 0;
let mut edge_count = 0;
for line in content.lines() {
let obj: serde_json::Value = serde_json::from_str(line).unwrap();
let record_type = obj.get("_type").unwrap().as_str().unwrap();
match record_type {
"node" => node_count += 1,
"edge" => edge_count += 1,
"hyperedge" => {}
_ => panic!("Unexpected _type: {}", record_type),
}
}
assert_eq!(node_count, 22);
assert!(edge_count > 0); }
}