use crate::algorithms::four_d::{GraphNode4D, TemporalEdge};
use crate::storage::data_structures::{EdgeRec, NodeRec};
use crate::storage::graph_storage_manager::GraphStorageManager;
use crate::storage::prop_store::{self, NodeProperties, PropertyValue};
use anyhow::{Context, Result};
use serde_json::Number;
use std::collections::{BTreeMap, HashMap};
use std::path::Path;
const PROPS_PROP_FILE: &str = "properties.prop";
fn json_to_prop(val: &serde_json::Value) -> Option<PropertyValue> {
match val {
serde_json::Value::Null => Some(PropertyValue::Null),
serde_json::Value::Bool(v) => Some(PropertyValue::Bool(*v)),
serde_json::Value::Number(n) => n
.as_u64()
.map(PropertyValue::U64)
.or_else(|| n.as_i64().map(PropertyValue::I64))
.or_else(|| n.as_f64().map(PropertyValue::F64)),
serde_json::Value::String(s) => Some(PropertyValue::String(s.clone())),
serde_json::Value::Array(arr) => {
let mut out = Vec::with_capacity(arr.len());
for item in arr {
match item {
serde_json::Value::Number(n) => {
if let Some(f) = n.as_f64() {
out.push(f);
} else {
return Some(PropertyValue::String(val.to_string()));
}
}
_ => return Some(PropertyValue::String(val.to_string())),
}
}
Some(PropertyValue::F64Array(out))
}
serde_json::Value::Object(_) => {
Some(PropertyValue::String(val.to_string()))
}
}
}
fn prop_to_json(val: &PropertyValue) -> serde_json::Value {
match val {
PropertyValue::Null => serde_json::Value::Null,
PropertyValue::Bool(v) => serde_json::Value::Bool(*v),
PropertyValue::U64(v) => serde_json::Value::Number(Number::from(*v)),
PropertyValue::I64(v) => serde_json::Value::Number(Number::from(*v)),
PropertyValue::F64(v) => {
serde_json::Value::Number(Number::from_f64(*v).unwrap_or_else(|| Number::from(0)))
}
PropertyValue::String(s) => serde_json::Value::String(s.clone()),
PropertyValue::F64Array(arr) => serde_json::Value::Array(
arr.iter()
.map(|v| {
serde_json::Value::Number(
Number::from_f64(*v).unwrap_or_else(|| Number::from(0)),
)
})
.collect(),
),
PropertyValue::Bytes(_b) => serde_json::Value::Null,
}
}
fn graph_props_to_internal(props: &BTreeMap<String, serde_json::Value>) -> NodeProperties {
let mut out = NodeProperties::new();
for (k, v) in props {
if let Some(pv) = json_to_prop(v) {
out.insert(k.clone(), pv);
}
}
out
}
fn internal_to_graph_props(props: &NodeProperties) -> BTreeMap<String, serde_json::Value> {
let mut out = BTreeMap::new();
for (k, v) in props {
out.insert(k.clone(), prop_to_json(v));
}
out
}
pub fn save_graph4d(nodes: &[GraphNode4D], dir: &Path) -> Result<()> {
let sm = GraphStorageManager::open(dir, 4096, 4096, 4096, 4096)
.context("Failed to open GraphStorageManager for save")?;
let mut id_map: HashMap<u64, u64> = HashMap::new();
for (idx, node) in nodes.iter().enumerate() {
id_map.insert(node.id, idx as u64);
}
let mut prop_nodes = Vec::with_capacity(nodes.len());
for (seq_id, node) in nodes.iter().enumerate() {
let seq_id = seq_id as u64;
let mut internal_props = graph_props_to_internal(&node.properties);
internal_props.insert("_original_id".to_string(), PropertyValue::U64(node.id));
prop_nodes.push((seq_id, internal_props));
}
let prop_store = prop_store::build_prop_store(prop_nodes);
let prop_path = dir.join(PROPS_PROP_FILE);
prop_store::write_prop_store(&prop_store, &prop_path).context("write binary property store")?;
let mut all_edges = Vec::new();
for (seq_id, node) in nodes.iter().enumerate() {
let seq_id = seq_id as u64;
let off = all_edges.len() as u32;
let len = node.successors.len() as u32;
let node_rec = NodeRec {
id: seq_id,
morton_code: node.id, x: node.x,
y: node.y,
z: node.z,
edge_off: off,
edge_len: len,
flags: 0,
begin_ts: node.begin_ts,
end_ts: node.end_ts,
tx_id: 0,
visibility: 1,
_padding: [0u8; 7],
};
sm.write_node_at(&node_rec)
.with_context(|| format!("write_node_at seq_id={}", seq_id))?;
for edge in &node.successors {
let remapped_dst = *id_map
.get(&edge.dst)
.with_context(|| format!("edge dst {} not found in node list", edge.dst))?;
let edge_rec = EdgeRec {
src: seq_id,
dst: remapped_dst,
w: edge.weight,
flags: 0,
begin_ts: edge.begin_ts,
end_ts: edge.end_ts,
tx_id: 0,
visibility: 1,
_padding: [0u8; 7],
};
all_edges.push(edge_rec);
}
}
for edge_rec in &all_edges {
sm.append_edge_record(edge_rec)
.context("append_edge_record failed")?;
}
sm.sync_all().context("sync_all after save")?;
Ok(())
}
pub fn load_graph4d(dir: &Path) -> Result<Vec<GraphNode4D>> {
let sm = GraphStorageManager::open(dir, 4096, 4096, 4096, 4096)
.context("Failed to open GraphStorageManager for load")?;
let node_count = sm.node_count();
let edge_count = sm.edge_count();
let prop_path = dir.join(PROPS_PROP_FILE);
let prop_data: Option<prop_store::PropStore> = if prop_path.exists() {
Some(prop_store::read_prop_store(&prop_path).context("read binary property store")?)
} else {
None
};
let mut nodes = Vec::with_capacity(node_count);
let mut edges_by_node: HashMap<u64, Vec<TemporalEdge>> = HashMap::new();
for i in 0..edge_count as u32 {
let edge_rec = sm
.read_edge_raw(i)
.with_context(|| format!("read edge {}", i))?;
let original_dst = if let Some(ref ps) = prop_data {
if let Some(props) = prop_store::lookup(ps, edge_rec.dst)? {
props
.get("_original_id")
.and_then(|v| match v {
PropertyValue::U64(id) => Some(*id),
_ => None,
})
.unwrap_or(edge_rec.dst)
} else {
edge_rec.dst
}
} else {
edge_rec.dst
};
let temporal = TemporalEdge {
dst: original_dst,
weight: edge_rec.w,
begin_ts: edge_rec.begin_ts,
end_ts: edge_rec.end_ts,
};
edges_by_node
.entry(edge_rec.src)
.or_default()
.push(temporal);
}
for seq_id in 0..node_count as u64 {
let rec = sm
.get_node_record(seq_id)
.with_context(|| format!("read node {}", seq_id))?;
let (original_id, props) = if let Some(ref ps) = prop_data {
if let Some(internal_props) = prop_store::lookup(ps, seq_id)? {
let original_id = internal_props
.get("_original_id")
.and_then(|v| match v {
PropertyValue::U64(id) => Some(*id),
_ => None,
})
.unwrap_or(rec.id);
let mut clean = internal_props.clone();
clean.remove("_original_id");
(original_id, internal_to_graph_props(&clean))
} else {
(rec.id, BTreeMap::new())
}
} else {
(rec.id, BTreeMap::new())
};
let successors = edges_by_node.remove(&seq_id).unwrap_or_default();
nodes.push(GraphNode4D {
id: original_id,
x: rec.x,
y: rec.y,
z: rec.z,
begin_ts: rec.begin_ts,
end_ts: rec.end_ts,
properties: props,
successors,
});
}
Ok(nodes)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::algorithms::four_d::GraphNode4D;
use tempfile::tempdir;
fn make_node(id: u64, x: f32, y: f32, z: f32) -> GraphNode4D {
GraphNode4D {
id,
x,
y,
z,
begin_ts: id * 10,
end_ts: id * 10 + 100,
properties: std::collections::BTreeMap::new(),
successors: vec![],
}
}
#[test]
fn test_save_load_roundtrip() {
let dir = tempdir().unwrap();
let mut nodes = vec![
make_node(0, 0.0, 0.0, 0.0),
make_node(1, 1.0, 0.0, 0.0),
make_node(2, 0.0, 1.0, 0.0),
];
nodes[0].successors.push(TemporalEdge {
dst: 1,
weight: 1.5,
begin_ts: 0,
end_ts: 50,
});
nodes[1].successors.push(TemporalEdge {
dst: 2,
weight: 2.0,
begin_ts: 10,
end_ts: 60,
});
save_graph4d(&nodes, dir.path()).unwrap();
let loaded = load_graph4d(dir.path()).unwrap();
assert_eq!(loaded.len(), 3);
assert_eq!(loaded[0].id, 0);
assert_eq!(loaded[0].x, 0.0);
assert_eq!(loaded[0].successors.len(), 1);
assert_eq!(loaded[0].successors[0].dst, 1);
assert_eq!(loaded[0].successors[0].weight, 1.5);
assert_eq!(loaded[1].successors.len(), 1);
assert_eq!(loaded[1].successors[0].dst, 2);
assert_eq!(loaded[1].successors[0].weight, 2.0);
assert_eq!(loaded[2].successors.len(), 0);
}
#[test]
fn test_properties_preserved() {
let dir = tempdir().unwrap();
let mut node = make_node(5, 3.0, 4.0, 5.0);
node.properties.insert(
"kind".to_string(),
serde_json::Value::String("hub".to_string()),
);
node.properties.insert(
"capacity".to_string(),
serde_json::Value::Number(100.into()),
);
save_graph4d(&[node.clone()], dir.path()).unwrap();
let loaded = load_graph4d(dir.path()).unwrap();
assert_eq!(loaded.len(), 1);
assert_eq!(loaded[0].id, 5); assert_eq!(loaded[0].x, 3.0);
assert_eq!(loaded[0].y, 4.0);
assert_eq!(loaded[0].z, 5.0);
assert_eq!(
loaded[0].properties.get("kind"),
Some(&serde_json::Value::String("hub".to_string()))
);
assert_eq!(
loaded[0].properties.get("capacity"),
Some(&serde_json::Value::Number(100.into()))
);
assert!(!loaded[0].properties.contains_key("_original_id"));
}
#[test]
fn test_non_sequential_ids_roundtrip() {
let dir = tempdir().unwrap();
let mut a = make_node(100, 1.0, 2.0, 3.0);
let b = make_node(200, 4.0, 5.0, 6.0);
a.successors.push(TemporalEdge {
dst: 200,
weight: 3.0,
begin_ts: 0,
end_ts: 100,
});
save_graph4d(&[a, b], dir.path()).unwrap();
let loaded = load_graph4d(dir.path()).unwrap();
assert_eq!(loaded.len(), 2);
assert_eq!(loaded[0].id, 100);
assert_eq!(loaded[1].id, 200);
assert_eq!(loaded[0].successors.len(), 1);
assert_eq!(loaded[0].successors[0].dst, 200);
assert!(!loaded[0].properties.contains_key("_original_id"));
}
}