use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use crate::constants::*;
use crate::error::Result;
use crate::types::*;
use crate::util::compression::CompressionOptions;
use super::manifest::{update_manifest_for_compaction, write_manifest};
use super::snapshot::reader::SnapshotData;
use super::snapshot::writer::{build_snapshot, EdgeData, NodeData, SnapshotBuildInput};
use super::wal::writer::create_wal_segment;
#[derive(Debug, Clone, Default)]
pub struct OptimizeOptions {
pub compression: Option<CompressionOptions>,
}
pub struct CollectedGraphData {
pub nodes: Vec<NodeData>,
pub edges: Vec<EdgeData>,
pub labels: HashMap<LabelId, String>,
pub etypes: HashMap<ETypeId, String>,
pub propkeys: HashMap<PropKeyId, String>,
}
pub fn optimize(
db_path: &Path,
snapshot: Option<&SnapshotData>,
delta: &DeltaState,
manifest: &ManifestV1,
options: &OptimizeOptions,
) -> Result<(ManifestV1, PathBuf)> {
let collected = collect_graph_data(snapshot, delta)?;
let new_gen = manifest.active_snapshot_gen + 1;
let new_wal_seg = manifest.active_wal_seg + 1;
let input = SnapshotBuildInput {
generation: new_gen,
nodes: collected.nodes,
edges: collected.edges,
labels: collected.labels,
etypes: collected.etypes,
propkeys: collected.propkeys,
compression: options.compression.clone(),
};
let snapshot_path = build_snapshot(db_path, input)?;
create_wal_segment(db_path, new_wal_seg)?;
let new_manifest = update_manifest_for_compaction(manifest, new_gen, new_wal_seg);
write_manifest(db_path, &new_manifest)?;
gc_snapshots(
db_path,
new_manifest.active_snapshot_gen,
new_manifest.prev_snapshot_gen,
)?;
Ok((new_manifest, PathBuf::from(snapshot_path)))
}
pub fn collect_graph_data(
snapshot: Option<&SnapshotData>,
delta: &DeltaState,
) -> Result<CollectedGraphData> {
let mut nodes: Vec<NodeData> = Vec::new();
let mut edges: Vec<EdgeData> = Vec::new();
let mut labels: HashMap<LabelId, String> = HashMap::new();
let mut etypes: HashMap<ETypeId, String> = HashMap::new();
let mut propkeys: HashMap<PropKeyId, String> = HashMap::new();
if let Some(snap) = snapshot {
let num_nodes = snap.header.num_nodes as usize;
for label_id in 1..=snap.header.num_labels as LabelId {
if let Some(name) = snap.get_label_name(label_id) {
labels.insert(label_id, name.to_string());
}
}
for etype_id in 1..=snap.header.num_etypes as ETypeId {
if let Some(name) = snap.get_etype_name(etype_id) {
etypes.insert(etype_id, name.to_string());
}
}
for propkey_id in 1..=snap.header.num_propkeys as PropKeyId {
if let Some(name) = snap.get_propkey_name(propkey_id) {
propkeys.insert(propkey_id, name.to_string());
}
}
for phys in 0..num_nodes {
let node_id = match snap.get_node_id(phys as PhysNode) {
Some(id) => id,
None => continue,
};
if delta.deleted_nodes.contains(&node_id) {
continue;
}
let key = snap.get_node_key(phys as PhysNode);
let mut props: HashMap<PropKeyId, PropValue> = HashMap::new();
if let Some(snapshot_props) = snap.get_node_props(phys as PhysNode) {
for (key_id, value) in snapshot_props {
props.insert(key_id, value);
}
}
let mut node_labels: std::collections::HashSet<LabelId> = std::collections::HashSet::new();
if let Some(snapshot_labels) = snap.get_node_labels(phys as PhysNode) {
node_labels.extend(snapshot_labels.into_iter());
}
if let Some(node_delta) = delta.modified_nodes.get(&node_id) {
if let Some(delta_props) = &node_delta.props {
for (key_id, value) in delta_props {
if let Some(v) = value {
props.insert(*key_id, v.clone());
} else {
props.remove(key_id);
}
}
}
if let Some(delta_labels) = &node_delta.labels {
node_labels.extend(delta_labels.iter().copied());
}
if let Some(deleted) = &node_delta.labels_deleted {
for label_id in deleted {
node_labels.remove(label_id);
}
}
}
let mut node_labels: Vec<LabelId> = node_labels.into_iter().collect();
node_labels.sort_unstable();
nodes.push(NodeData {
node_id,
key,
labels: node_labels,
props,
});
for edge in snap.get_out_edges(phys as PhysNode) {
let dst_node_id = match snap.get_node_id(edge.dst) {
Some(id) => id,
None => continue,
};
if delta.deleted_nodes.contains(&dst_node_id) {
continue;
}
if delta.is_edge_deleted(node_id, edge.etype, dst_node_id) {
continue;
}
let mut edge_props: HashMap<PropKeyId, PropValue> = HashMap::new();
if let Some(edge_idx) = snap.find_edge_index(phys as PhysNode, edge.etype, edge.dst) {
if let Some(snapshot_edge_props) = snap.get_edge_props(edge_idx) {
for (key_id, value) in snapshot_edge_props {
edge_props.insert(key_id, value);
}
}
}
let edge_key = (node_id, edge.etype, dst_node_id);
if let Some(delta_edge_props) = delta.edge_props.get(&edge_key) {
for (key_id, value) in delta_edge_props {
if let Some(v) = value {
edge_props.insert(*key_id, v.clone());
} else {
edge_props.remove(key_id);
}
}
}
edges.push(EdgeData {
src: node_id,
etype: edge.etype,
dst: dst_node_id,
props: edge_props,
});
}
}
}
for (label_id, name) in &delta.new_labels {
labels.insert(*label_id, name.clone());
}
for (etype_id, name) in &delta.new_etypes {
etypes.insert(*etype_id, name.clone());
}
for (propkey_id, name) in &delta.new_propkeys {
propkeys.insert(*propkey_id, name.clone());
}
for (node_id, node_delta) in &delta.created_nodes {
let mut props: HashMap<PropKeyId, PropValue> = HashMap::new();
if let Some(delta_props) = &node_delta.props {
for (key_id, value) in delta_props {
if let Some(v) = value {
props.insert(*key_id, v.clone());
}
}
}
let mut node_labels: Vec<LabelId> = node_delta
.labels
.as_ref()
.map(|l| l.iter().copied().collect())
.unwrap_or_default();
node_labels.sort_unstable();
nodes.push(NodeData {
node_id: *node_id,
key: node_delta.key.clone(),
labels: node_labels,
props,
});
}
for (src, patches) in &delta.out_add {
for patch in patches {
if delta.deleted_nodes.contains(src) || delta.deleted_nodes.contains(&patch.other) {
continue;
}
let edge_key = (*src, patch.etype, patch.other);
let mut edge_props: HashMap<PropKeyId, PropValue> = HashMap::new();
if let Some(delta_edge_props) = delta.edge_props.get(&edge_key) {
for (key_id, value) in delta_edge_props {
if let Some(v) = value {
edge_props.insert(*key_id, v.clone());
}
}
}
edges.push(EdgeData {
src: *src,
etype: patch.etype,
dst: patch.other,
props: edge_props,
});
}
}
Ok(CollectedGraphData {
nodes,
edges,
labels,
etypes,
propkeys,
})
}
fn gc_snapshots(db_path: &Path, active_gen: u64, prev_gen: u64) -> Result<()> {
let snapshots_dir = db_path.join(SNAPSHOTS_DIR);
if !snapshots_dir.exists() {
return Ok(());
}
let entries = match fs::read_dir(&snapshots_dir) {
Ok(e) => e,
Err(_) => return Ok(()), };
for entry in entries.flatten() {
let filename = entry.file_name();
let filename_str = filename.to_string_lossy();
if let Some(gen) = parse_snapshot_gen(&filename_str) {
if gen == active_gen || gen == prev_gen {
continue;
}
let filepath = entry.path();
if fs::remove_file(&filepath).is_err() {
let trash_dir = db_path.join(TRASH_DIR);
let _ = fs::create_dir_all(&trash_dir);
let _ = fs::rename(&filepath, trash_dir.join(&filename));
}
}
}
Ok(())
}
pub fn clean_trash(db_path: &Path) -> Result<()> {
let trash_dir = db_path.join(TRASH_DIR);
if !trash_dir.exists() {
return Ok(());
}
let entries = match fs::read_dir(&trash_dir) {
Ok(e) => e,
Err(_) => return Ok(()),
};
for entry in entries.flatten() {
let _ = fs::remove_file(entry.path());
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_collect_graph_data_empty() {
let delta = DeltaState::new();
let result = collect_graph_data(None, &delta).unwrap();
assert!(result.nodes.is_empty());
assert!(result.edges.is_empty());
assert!(result.labels.is_empty());
assert!(result.etypes.is_empty());
assert!(result.propkeys.is_empty());
}
#[test]
fn test_collect_graph_data_delta_only() {
let mut delta = DeltaState::new();
delta.new_labels.insert(1, "Person".to_string());
delta.new_etypes.insert(1, "KNOWS".to_string());
delta.new_propkeys.insert(1, "name".to_string());
delta.create_node(1, Some("alice"));
delta.set_node_prop(1, 1, PropValue::String("Alice".to_string()));
delta.create_node(2, Some("bob"));
delta.set_node_prop(2, 1, PropValue::String("Bob".to_string()));
delta.add_edge(1, 1, 2);
let result = collect_graph_data(None, &delta).unwrap();
assert_eq!(result.nodes.len(), 2);
assert_eq!(result.edges.len(), 1);
assert_eq!(result.labels.len(), 1);
assert_eq!(result.etypes.len(), 1);
assert_eq!(result.propkeys.len(), 1);
let edge = &result.edges[0];
assert_eq!(edge.src, 1);
assert_eq!(edge.etype, 1);
assert_eq!(edge.dst, 2);
}
#[test]
fn test_collect_graph_data_with_deletion() {
let mut delta = DeltaState::new();
delta.create_node(1, None);
delta.create_node(2, None);
delta.create_node(3, None);
delta.new_etypes.insert(1, "LINK".to_string());
delta.add_edge(1, 1, 2);
delta.add_edge(2, 1, 3);
delta.delete_node(2);
let result = collect_graph_data(None, &delta).unwrap();
assert_eq!(result.nodes.len(), 2); assert_eq!(result.edges.len(), 0); }
#[test]
fn test_collect_graph_data_edge_deletion() {
let mut delta = DeltaState::new();
delta.create_node(1, None);
delta.create_node(2, None);
delta.new_etypes.insert(1, "LINK".to_string());
delta.add_edge(1, 1, 2);
delta.delete_edge(1, 1, 2);
let result = collect_graph_data(None, &delta).unwrap();
assert_eq!(result.nodes.len(), 2);
assert_eq!(result.edges.len(), 0); }
#[test]
fn test_optimize_options_default() {
let opts = OptimizeOptions::default();
assert!(opts.compression.is_none());
}
}