use crate::datatypes::{DataFrame, Value};
use crate::graph::introspection::reporting::{ConnectionOperationReport, NodeOperationReport};
use crate::graph::mutation::maintain::{add_connections, add_nodes};
use crate::graph::schema::DirGraph;
use crate::graph::storage::GraphRead;
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct ExtendReport {
pub nodes_created: usize,
pub nodes_updated: usize,
pub nodes_skipped: usize,
pub edges_created: usize,
pub edges_skipped: usize,
pub node_types_merged: usize,
pub connection_types_merged: usize,
pub labels_unioned: usize,
pub processing_time_ms: f64,
pub errors: Vec<String>,
}
struct NodeGroup {
columns: Vec<String>,
column_set: std::collections::HashSet<String>,
rows: Vec<(Value, Value, HashMap<String, Value>)>,
}
impl NodeGroup {
fn new() -> Self {
NodeGroup {
columns: Vec::new(),
column_set: std::collections::HashSet::new(),
rows: Vec::new(),
}
}
fn note_column(&mut self, name: &str) {
if self.column_set.insert(name.to_string()) {
self.columns.push(name.to_string());
}
}
}
struct EdgeGroup {
source_type: String,
target_type: String,
columns: Vec<String>,
column_set: std::collections::HashSet<String>,
rows: Vec<(Value, Value, HashMap<String, Value>)>,
}
impl EdgeGroup {
fn new(source_type: String, target_type: String) -> Self {
EdgeGroup {
source_type,
target_type,
columns: Vec::new(),
column_set: std::collections::HashSet::new(),
rows: Vec::new(),
}
}
fn note_column(&mut self, name: &str) {
if self.column_set.insert(name.to_string()) {
self.columns.push(name.to_string());
}
}
}
pub fn extend_graph(
target: &mut DirGraph,
source: &DirGraph,
conflict_handling: Option<String>,
) -> Result<ExtendReport, String> {
let start = std::time::Instant::now();
if target.graph.is_mapped() || target.graph.is_disk() {
return Err(scope_error("target"));
}
if source.graph.is_mapped() || source.graph.is_disk() {
return Err(scope_error("source"));
}
let mut node_groups: HashMap<String, NodeGroup> = HashMap::new();
let mut label_carriers: Vec<(String, Value, Vec<String>)> = Vec::new();
for idx in source.graph.node_indices() {
let Some(node) = source.graph.node_weight(idx) else {
continue;
};
let node_type = node.node_type_str(&source.interner).to_string();
let id = node.id().into_owned();
let title = node.title().into_owned();
let props = node.properties_cloned(&source.interner);
let group = node_groups
.entry(node_type.clone())
.or_insert_with(NodeGroup::new);
for k in props.keys() {
group.note_column(k);
}
group.rows.push((id.clone(), title, props));
let labels = source.node_labels(idx);
if labels.len() > 1 {
let secondaries: Vec<String> = labels
.iter()
.skip(1)
.map(|k| source.interner.resolve(*k).to_string())
.collect();
label_carriers.push((node_type, id, secondaries));
}
}
let mut report = ExtendReport {
nodes_created: 0,
nodes_updated: 0,
nodes_skipped: 0,
edges_created: 0,
edges_skipped: 0,
node_types_merged: node_groups.len(),
connection_types_merged: 0,
labels_unioned: 0,
processing_time_ms: 0.0,
errors: Vec::new(),
};
for (node_type, group) in node_groups {
let target_has_type = target.type_indices.get(&node_type).is_some();
if !target_has_type {
if let Some(alias) = source.id_field_aliases.get(&node_type) {
target
.id_field_aliases
.insert(node_type.clone(), alias.clone());
}
if let Some(alias) = source.title_field_aliases.get(&node_type) {
target
.title_field_aliases
.insert(node_type.clone(), alias.clone());
}
}
let df = build_node_dataframe(&group)?;
let r: NodeOperationReport = add_nodes(
target,
df,
node_type,
"id".to_string(),
Some("title".to_string()),
conflict_handling.clone(),
)?;
report.nodes_created += r.nodes_created;
report.nodes_updated += r.nodes_updated;
report.nodes_skipped += r.nodes_skipped;
report.errors.extend(r.errors);
}
for (node_type, id, secondaries) in label_carriers {
if let Some(target_idx) = target.lookup_by_id(&node_type, &id) {
for label in secondaries {
let key = target.interner.get_or_intern(&label);
if target.add_node_label(target_idx, key) {
report.labels_unioned += 1;
}
}
}
}
let mut edge_groups: HashMap<(String, String, String), EdgeGroup> = HashMap::new();
for edge_idx in source.graph.edge_indices() {
let Some(edge) = source.graph.edge_weight(edge_idx) else {
continue;
};
let Some((src_idx, tgt_idx)) = source.graph.edge_endpoints(edge_idx) else {
continue;
};
let conn_type = edge.connection_type_str(&source.interner).to_string();
let (Some(src_node), Some(tgt_node)) = (
source.graph.node_weight(src_idx),
source.graph.node_weight(tgt_idx),
) else {
continue;
};
let source_type = src_node.node_type_str(&source.interner).to_string();
let target_type = tgt_node.node_type_str(&source.interner).to_string();
let src_id = src_node.id().into_owned();
let tgt_id = tgt_node.id().into_owned();
let props = edge.properties_cloned(&source.interner);
let group = edge_groups
.entry((conn_type.clone(), source_type.clone(), target_type.clone()))
.or_insert_with(|| EdgeGroup::new(source_type, target_type));
for k in props.keys() {
group.note_column(k);
}
group.rows.push((src_id, tgt_id, props));
}
report.connection_types_merged = edge_groups
.keys()
.map(|(ct, _, _)| ct.clone())
.collect::<std::collections::HashSet<_>>()
.len();
for ((conn_type, _, _), group) in edge_groups {
let df = build_edge_dataframe(&group)?;
let r: ConnectionOperationReport = add_connections(
target,
df,
conn_type,
group.source_type,
"src_id".to_string(),
group.target_type,
"tgt_id".to_string(),
None,
None,
conflict_handling.clone(),
)?;
report.edges_created += r.connections_created;
report.edges_skipped += r.connections_skipped;
report.errors.extend(r.errors);
}
report.processing_time_ms = start.elapsed().as_secs_f64() * 1000.0;
Ok(report)
}
fn scope_error(which: &str) -> String {
format!(
"extend() requires both graphs to use in-memory (Default) storage, but the {which} \
graph is mapped/disk-backed. Merge by exporting one graph and re-importing it into the \
other (e.g. export to CSV / a blueprint, then add_nodes / add_connections), or rebuild \
both in memory before extending."
)
}
fn build_node_dataframe(group: &NodeGroup) -> Result<DataFrame, String> {
let mut columns = Vec::with_capacity(group.columns.len() + 2);
columns.push("id".to_string());
columns.push("title".to_string());
columns.extend(group.columns.iter().cloned());
let rows: Vec<Vec<Value>> = group
.rows
.iter()
.map(|(id, title, props)| {
let mut row = Vec::with_capacity(columns.len());
row.push(id.clone());
row.push(title.clone());
for col in &group.columns {
row.push(props.get(col).cloned().unwrap_or(Value::Null));
}
row
})
.collect();
DataFrame::from_cypher_rows(columns, rows)
}
fn build_edge_dataframe(group: &EdgeGroup) -> Result<DataFrame, String> {
let mut columns = Vec::with_capacity(group.columns.len() + 2);
columns.push("src_id".to_string());
columns.push("tgt_id".to_string());
columns.extend(group.columns.iter().cloned());
let rows: Vec<Vec<Value>> = group
.rows
.iter()
.map(|(src, tgt, props)| {
let mut row = Vec::with_capacity(columns.len());
row.push(src.clone());
row.push(tgt.clone());
for col in &group.columns {
row.push(props.get(col).cloned().unwrap_or(Value::Null));
}
row
})
.collect();
DataFrame::from_cypher_rows(columns, rows)
}