use crate::graph::schema::InternedKey;
use crate::graph::storage::mapped::mmap_vec::MmapOrVec;
use serde::{Deserialize, Serialize};
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use super::csr::TOMBSTONE_EDGE;
use super::edge_properties::{EdgePropertyStore, EdgePropertyStoreMeta};
use super::graph::{enumerate_segment_dirs, segment_subdir, DiskGraph, CURRENT_CSR_LAYOUT_VERSION};
use super::property_index;
#[derive(Serialize, Deserialize)]
struct DiskGraphMeta {
node_count: usize,
node_slots_len: usize,
edge_count: usize,
next_edge_idx: u32,
out_offsets_len: usize,
out_edges_len: usize,
in_offsets_len: usize,
in_edges_len: usize,
edge_endpoints_len: usize,
free_node_slots: Vec<u32>,
free_edge_slots: Vec<u32>,
#[serde(default)]
csr_sorted_by_type: bool,
#[serde(default = "default_has_tombstones")]
has_tombstones: bool,
#[serde(default)]
edge_properties_format: u8,
#[serde(default)]
edge_properties_meta: EdgePropertyStoreMeta,
#[serde(default)]
csr_layout_version: u8,
#[serde(default)]
sealed_nodes_bound: u32,
}
fn default_has_tombstones() -> bool {
true
}
impl DiskGraph {
pub(crate) fn write_metadata(&self) -> std::io::Result<()> {
let edge_props_meta = EdgePropertyStore::meta_for(&self.data_dir);
self.write_metadata_to(&self.data_dir, edge_props_meta)
}
fn write_metadata_to(
&self,
dir: &Path,
edge_props_meta: EdgePropertyStoreMeta,
) -> std::io::Result<()> {
let meta = DiskGraphMeta {
node_count: self.node_count,
node_slots_len: self.node_slots.len(),
edge_count: self.edge_count,
next_edge_idx: self.next_edge_idx,
out_offsets_len: self.out_offsets.len(),
out_edges_len: self.out_edges.len(),
in_offsets_len: self.in_offsets.len(),
in_edges_len: self.in_edges.len(),
edge_endpoints_len: self.edge_endpoints.len(),
free_node_slots: self.free_node_slots.clone(),
free_edge_slots: self.free_edge_slots.clone(),
csr_sorted_by_type: self.csr_sorted_by_type,
has_tombstones: self.has_tombstones,
edge_properties_format: 1,
edge_properties_meta: edge_props_meta,
csr_layout_version: CURRENT_CSR_LAYOUT_VERSION,
sealed_nodes_bound: self.sealed_nodes_bound,
};
let json = serde_json::to_string_pretty(&meta).map_err(std::io::Error::other)?;
std::fs::write(dir.join("disk_graph_meta.json"), json)
}
fn build_single_segment_manifest(&self) -> super::segment_summary::SegmentManifest {
use super::segment_summary::{PropRange, SegmentManifest, SegmentSummary};
use std::collections::HashSet;
let mut summary = SegmentSummary::new(0, 0);
summary.node_id_hi = self.node_count as u32;
summary.edge_count = self.edge_count as u64;
for i in 0..self.conn_type_index_types.len() {
summary.conn_types.insert(self.conn_type_index_types.get(i));
}
for edges in self.overflow_out.values() {
for e in edges {
if e.edge_idx == TOMBSTONE_EDGE {
continue;
}
let ct = self.edge_endpoints.get(e.edge_idx as usize).connection_type;
summary.conn_types.insert(ct);
}
}
for (type_key, store) in &self.column_stores {
summary
.node_type_counts
.insert(type_key.as_u64(), store.row_count());
}
let mut seen: HashSet<(u64, u64)> = HashSet::new();
if let Ok(cache) = self.property_indexes.read() {
for ((ty, prop), slot) in cache.iter() {
if slot.is_none() {
continue;
}
let t_hash = InternedKey::from_str(ty).as_u64();
let p_hash = InternedKey::from_str(prop).as_u64();
if seen.insert((t_hash, p_hash)) {
summary.indexed_prop_ranges.push((
t_hash,
p_hash,
PropRange::StringBloomPlaceholder,
));
}
}
}
for (t_hash, p_hash) in property_index::scan_segment_hashes(&self.data_dir) {
if seen.insert((t_hash, p_hash)) {
summary.indexed_prop_ranges.push((
t_hash,
p_hash,
PropRange::StringBloomPlaceholder,
));
}
}
let mut manifest = SegmentManifest::new();
manifest.append(summary);
manifest
}
pub fn save_to_dir(
&mut self,
target_dir: &Path,
_interner: &crate::graph::schema::StringInterner,
) -> std::io::Result<()> {
self.clear_arenas();
std::fs::create_dir_all(target_dir)?;
let have_prior_save = !self.segment_manifest.is_empty();
let same_dir = target_dir == self.data_dir.parent().unwrap_or(target_dir);
if have_prior_save && same_dir && self.sealed_nodes_bound < self.node_count as u32 {
let _seg_id = self.seal_to_new_segment(target_dir)?;
return Ok(());
}
let csr_target = target_dir.join(segment_subdir(0));
std::fs::create_dir_all(&csr_target)?;
if csr_target == self.data_dir {
for (seg_id, seg_path) in enumerate_segment_dirs(target_dir) {
if seg_id > 0 {
let _ = std::fs::remove_dir_all(&seg_path);
}
}
}
self.node_slots
.save_to_file(&csr_target.join("node_slots.bin"))?;
self.out_offsets
.save_to_file(&csr_target.join("out_offsets.bin"))?;
self.out_edges
.save_to_file(&csr_target.join("out_edges.bin"))?;
self.in_offsets
.save_to_file(&csr_target.join("in_offsets.bin"))?;
self.in_edges
.save_to_file(&csr_target.join("in_edges.bin"))?;
self.edge_endpoints
.save_to_file(&csr_target.join("edge_endpoints.bin"))?;
if !self.overflow_out.is_empty() || !self.overflow_in.is_empty() {
let overflow = (&self.overflow_out, &self.overflow_in);
let bytes = bincode::serialize(&overflow).map_err(std::io::Error::other)?;
let compressed =
zstd::encode_all(bytes.as_slice(), 3).map_err(std::io::Error::other)?;
std::fs::write(target_dir.join("overflow_edges.bin.zst"), compressed)?;
}
let upper = self.next_edge_idx;
self.edge_properties.save_to(&csr_target, upper)?;
let edge_props_meta = EdgePropertyStore::meta_for(&csr_target);
for field in [
&self.conn_type_index_types as &MmapOrVec<u64>,
&self.conn_type_index_offsets,
] {
if let Some(path) = field.file_path().map(PathBuf::from) {
let _ = field.save_to_file(&path);
}
}
if let Some(path) = self.conn_type_index_sources.file_path().map(PathBuf::from) {
let _ = self.conn_type_index_sources.save_to_file(&path);
}
if csr_target == self.data_dir {
let _ = self.node_slots.trim_to_logical_length();
let _ = self.out_offsets.trim_to_logical_length();
let _ = self.out_edges.trim_to_logical_length();
let _ = self.in_offsets.trim_to_logical_length();
let _ = self.in_edges.trim_to_logical_length();
let _ = self.edge_endpoints.trim_to_logical_length();
}
let manifest = self.build_single_segment_manifest();
manifest.save_to(target_dir)?;
self.segment_manifest = manifest;
self.write_metadata_to(target_dir, edge_props_meta)?;
if csr_target != self.data_dir {
for fname in [
"conn_type_index_types.bin",
"conn_type_index_offsets.bin",
"conn_type_index_sources.bin",
"peer_count_types.bin",
"peer_count_offsets.bin",
"peer_count_entries.bin",
] {
let src = self.data_dir.join(fname);
if src.exists() {
let _ = std::fs::copy(&src, csr_target.join(fname));
}
}
}
self.sealed_nodes_bound = self.node_count as u32;
Ok(())
}
pub fn seal_to_new_segment(&mut self, root: &Path) -> std::io::Result<u32> {
use super::csr::{CsrEdge, DiskNodeSlot, EdgeEndpoints};
let tail_lo = self.sealed_nodes_bound;
let tail_hi = self.node_count as u32;
if tail_hi <= tail_lo {
return Err(std::io::Error::other(
"seal_to_new_segment: nothing to seal — node_count <= sealed_nodes_bound",
));
}
let tail_len = (tail_hi - tail_lo) as usize;
let mut has_cross_segment = false;
for edges in self.overflow_out.values() {
for e in edges {
if e.edge_idx == TOMBSTONE_EDGE {
continue;
}
let ep = self.edge_endpoints.get(e.edge_idx as usize);
if ep.source < tail_lo || ep.target < tail_lo {
has_cross_segment = true;
break;
}
}
if has_cross_segment {
break;
}
}
let existing = enumerate_segment_dirs(root);
let next_id = existing
.iter()
.map(|(id, _)| *id)
.max()
.map(|m| m + 1)
.unwrap_or(0);
let seg_dir = root.join(segment_subdir(next_id));
std::fs::create_dir_all(&seg_dir)?;
struct SealEdge {
src_global: u32,
tgt_global: u32,
conn_type: u64,
}
let mut seal_edges: Vec<SealEdge> = Vec::new();
for (&src_global, edges) in &self.overflow_out {
for e in edges {
if e.edge_idx == TOMBSTONE_EDGE {
continue;
}
let ep = self.edge_endpoints.get(e.edge_idx as usize);
seal_edges.push(SealEdge {
src_global,
tgt_global: ep.target,
conn_type: ep.connection_type,
});
}
}
seal_edges.sort_by_key(|e| (e.src_global, e.conn_type));
let n_edges = seal_edges.len();
let offsets_len = if has_cross_segment {
self.node_count + 1
} else {
tail_len + 1
};
let mut node_slots: MmapOrVec<DiskNodeSlot> = MmapOrVec::with_capacity(tail_len);
for i in 0..tail_len {
node_slots.push(self.node_slots.get(tail_lo as usize + i));
}
let mut edge_endpoints: MmapOrVec<EdgeEndpoints> = MmapOrVec::with_capacity(n_edges);
for e in &seal_edges {
edge_endpoints.push(EdgeEndpoints {
source: e.src_global,
target: e.tgt_global,
connection_type: e.conn_type,
});
}
let offset_key = |s: u32| -> u32 {
if has_cross_segment {
s
} else {
s - tail_lo
}
};
let mut out_offsets: MmapOrVec<u64> = MmapOrVec::with_capacity(offsets_len);
let mut out_edges: MmapOrVec<CsrEdge> = MmapOrVec::with_capacity(n_edges);
let mut cursor = 0usize;
for k in 0..(offsets_len - 1) as u32 {
out_offsets.push(cursor as u64);
while cursor < n_edges && offset_key(seal_edges[cursor].src_global) == k {
let e = &seal_edges[cursor];
out_edges.push(CsrEdge {
peer: e.tgt_global,
edge_idx: cursor as u32,
});
cursor += 1;
}
}
out_offsets.push(cursor as u64);
let mut by_target: Vec<(u32, u32)> = seal_edges
.iter()
.enumerate()
.map(|(orig_idx, e)| (e.tgt_global, orig_idx as u32))
.collect();
by_target.sort_by_key(|(t, _)| *t);
let mut in_offsets: MmapOrVec<u64> = MmapOrVec::with_capacity(offsets_len);
let mut in_edges: MmapOrVec<CsrEdge> = MmapOrVec::with_capacity(n_edges);
let mut tcursor = 0usize;
for k in 0..(offsets_len - 1) as u32 {
in_offsets.push(tcursor as u64);
while tcursor < n_edges && offset_key(by_target[tcursor].0) == k {
let (_, orig_idx) = by_target[tcursor];
let src_peer = seal_edges[orig_idx as usize].src_global;
in_edges.push(CsrEdge {
peer: src_peer,
edge_idx: orig_idx,
});
tcursor += 1;
}
}
in_offsets.push(tcursor as u64);
node_slots.save_to_file(&seg_dir.join("node_slots.bin"))?;
out_offsets.save_to_file(&seg_dir.join("out_offsets.bin"))?;
out_edges.save_to_file(&seg_dir.join("out_edges.bin"))?;
in_offsets.save_to_file(&seg_dir.join("in_offsets.bin"))?;
in_edges.save_to_file(&seg_dir.join("in_edges.bin"))?;
edge_endpoints.save_to_file(&seg_dir.join("edge_endpoints.bin"))?;
let _ = super::builder::write_conn_type_index(
&out_offsets,
&out_edges,
&edge_endpoints,
offsets_len - 1,
&seg_dir,
false,
);
let _ = super::builder::write_peer_count_histogram(
&edge_endpoints,
0,
n_edges,
&seg_dir,
false,
);
let upper = self.next_edge_idx;
self.edge_properties.save_to(&self.data_dir, upper)?;
use super::segment_summary::SegmentSummary;
let mut summary = SegmentSummary::new(next_id, tail_lo);
summary.node_id_hi = tail_hi;
summary.edge_count = n_edges as u64;
for e in &seal_edges {
summary.conn_types.insert(e.conn_type);
}
for i in 0..tail_len {
let ns = self.node_slots.get(tail_lo as usize + i);
if !ns.is_alive() {
continue;
}
*summary.node_type_counts.entry(ns.node_type).or_insert(0) += 1;
}
self.segment_manifest.append(summary);
self.segment_manifest.save_to(root)?;
let sealed_edge_count = n_edges;
let seg0_next_edge_idx = self.next_edge_idx as usize - sealed_edge_count;
reconcile_seg0_csr::<DiskNodeSlot>(&mut self.node_slots, tail_lo as usize)?;
reconcile_seg0_csr::<u64>(&mut self.out_offsets, tail_lo as usize + 1)?;
reconcile_seg0_csr::<u64>(&mut self.in_offsets, tail_lo as usize + 1)?;
reconcile_seg0_csr::<EdgeEndpoints>(&mut self.edge_endpoints, seg0_next_edge_idx)?;
self.overflow_out.clear();
self.overflow_in.clear();
self.sealed_nodes_bound = tail_hi;
let edge_props_meta = EdgePropertyStore::meta_for(&self.data_dir);
self.write_metadata_to(root, edge_props_meta)?;
Ok(next_id)
}
pub fn load_from_dir(
dir: &Path,
interner: &mut crate::graph::schema::StringInterner,
) -> std::io::Result<(Self, PathBuf)> {
use crate::graph::io::load_timing::{log_stage, stage_timer};
let t = stage_timer();
let meta_str = std::fs::read_to_string(dir.join("disk_graph_meta.json"))?;
let meta: DiskGraphMeta = serde_json::from_str(&meta_str).map_err(std::io::Error::other)?;
log_stage("dg.meta_parse", t);
let temp_dir = dir.join("_zst_cache");
let t = stage_timer();
let (csr_dir, segment_csr): (PathBuf, SegmentCsr) = if meta.csr_layout_version >= 1 {
let segs = enumerate_segment_dirs(dir);
match segs.len() {
0 => {
return Err(std::io::Error::other(format!(
"csr_layout_version={} but no seg_NNN/ directory found under {}",
meta.csr_layout_version,
dir.display()
)));
}
1 => {
let seg_dir = segs.into_iter().next().unwrap().1;
let csr = SegmentCsr {
node_slots: load_raw_or_zst(
&seg_dir.join("node_slots"),
meta.node_slots_len,
&temp_dir,
)?,
out_offsets: load_raw_or_zst(
&seg_dir.join("out_offsets"),
meta.out_offsets_len,
&temp_dir,
)?,
out_edges: load_raw_or_zst(
&seg_dir.join("out_edges"),
meta.out_edges_len,
&temp_dir,
)?,
in_offsets: load_raw_or_zst(
&seg_dir.join("in_offsets"),
meta.in_offsets_len,
&temp_dir,
)?,
in_edges: load_raw_or_zst(
&seg_dir.join("in_edges"),
meta.in_edges_len,
&temp_dir,
)?,
edge_endpoints: load_raw_or_zst(
&seg_dir.join("edge_endpoints"),
meta.edge_endpoints_len,
&temp_dir,
)?,
conn_type_index_types: load_raw_or_zst_optional(
&seg_dir.join("conn_type_index_types"),
),
conn_type_index_offsets: load_raw_or_zst_optional(
&seg_dir.join("conn_type_index_offsets"),
),
conn_type_index_sources: load_raw_or_zst_optional(
&seg_dir.join("conn_type_index_sources"),
),
peer_count_types: load_raw_or_zst_optional(
&seg_dir.join("peer_count_types"),
),
peer_count_offsets: load_raw_or_zst_optional(
&seg_dir.join("peer_count_offsets"),
),
peer_count_entries: load_raw_or_zst_optional(
&seg_dir.join("peer_count_entries"),
),
};
(seg_dir, csr)
}
_ => {
let mut loaded = Vec::with_capacity(segs.len());
let first_dir = segs[0].1.clone();
for (_, sdir) in &segs {
loaded.push(SegmentCsr::load_from(sdir, &temp_dir)?);
}
let csr = concat_segment_csrs(loaded);
(first_dir, csr)
}
}
} else {
let csr = SegmentCsr {
node_slots: load_raw_or_zst(
&dir.join("node_slots"),
meta.node_slots_len,
&temp_dir,
)?,
out_offsets: load_raw_or_zst(
&dir.join("out_offsets"),
meta.out_offsets_len,
&temp_dir,
)?,
out_edges: load_raw_or_zst(&dir.join("out_edges"), meta.out_edges_len, &temp_dir)?,
in_offsets: load_raw_or_zst(
&dir.join("in_offsets"),
meta.in_offsets_len,
&temp_dir,
)?,
in_edges: load_raw_or_zst(&dir.join("in_edges"), meta.in_edges_len, &temp_dir)?,
edge_endpoints: load_raw_or_zst(
&dir.join("edge_endpoints"),
meta.edge_endpoints_len,
&temp_dir,
)?,
conn_type_index_types: load_raw_or_zst_optional(&dir.join("conn_type_index_types")),
conn_type_index_offsets: load_raw_or_zst_optional(
&dir.join("conn_type_index_offsets"),
),
conn_type_index_sources: load_raw_or_zst_optional(
&dir.join("conn_type_index_sources"),
),
peer_count_types: load_raw_or_zst_optional(&dir.join("peer_count_types")),
peer_count_offsets: load_raw_or_zst_optional(&dir.join("peer_count_offsets")),
peer_count_entries: load_raw_or_zst_optional(&dir.join("peer_count_entries")),
};
(dir.to_path_buf(), csr)
};
log_stage("dg.segment_csr", t);
let SegmentCsr {
node_slots,
out_offsets,
out_edges,
in_offsets,
in_edges,
edge_endpoints,
conn_type_index_types,
conn_type_index_offsets,
conn_type_index_sources,
peer_count_types,
peer_count_offsets,
peer_count_entries,
} = segment_csr;
let t = stage_timer();
let edge_properties = EdgePropertyStore::load_from(
&csr_dir,
meta.edge_properties_format,
meta.edge_properties_meta,
interner,
)?;
log_stage("dg.edge_properties", t);
let t = stage_timer();
let (overflow_out, overflow_in) = if dir.join("overflow_edges.bin.zst").exists() {
let compressed = std::fs::read(dir.join("overflow_edges.bin.zst"))?;
let bytes = zstd::decode_all(compressed.as_slice()).map_err(std::io::Error::other)?;
bincode::deserialize(&bytes).map_err(std::io::Error::other)?
} else {
(HashMap::new(), HashMap::new())
};
log_stage("dg.overflow_edges", t);
let t = stage_timer();
let segment_manifest =
super::segment_summary::SegmentManifest::load_from(dir).unwrap_or_default();
log_stage("dg.segment_manifest", t);
let sealed_nodes_bound = if meta.sealed_nodes_bound == 0
&& !segment_manifest.is_empty()
&& meta.node_count > 0
{
meta.node_count as u32
} else {
meta.sealed_nodes_bound
};
Ok((
DiskGraph {
node_slots,
node_count: meta.node_count,
free_node_slots: meta.free_node_slots,
node_arena: std::sync::Mutex::new(Vec::with_capacity(1024)),
column_stores: HashMap::new(),
out_offsets,
out_edges,
in_offsets,
in_edges,
edge_endpoints,
edge_count: meta.edge_count,
next_edge_idx: meta.next_edge_idx,
edge_properties,
edge_arena: std::sync::Mutex::new(Vec::with_capacity(1024)),
edge_mut_cache: HashMap::new(),
node_mut_cache: HashMap::new(),
pending_edges: UnsafeCell::new(MmapOrVec::new()),
overflow_out,
overflow_in,
free_edge_slots: meta.free_edge_slots,
data_dir: csr_dir.clone(),
metadata_dirty: false,
csr_sorted_by_type: meta.csr_sorted_by_type,
defer_csr: false,
edge_type_counts_raw: None,
conn_type_index_types,
conn_type_index_offsets,
conn_type_index_sources,
peer_count_types,
peer_count_offsets,
peer_count_entries,
has_tombstones: meta.has_tombstones,
property_indexes: std::sync::RwLock::new(HashMap::new()),
global_indexes: std::sync::RwLock::new(HashMap::new()),
segment_manifest,
sealed_nodes_bound,
},
temp_dir,
))
}
}
fn load_raw_or_zst<T: Copy + Default + 'static>(
base_path: &Path,
len: usize,
temp_dir: &Path,
) -> std::io::Result<MmapOrVec<T>> {
let raw_path = base_path.with_extension("bin");
if raw_path.exists() && len > 0 {
return MmapOrVec::load_mapped(&raw_path, len);
}
let zst_path = base_path.with_extension("bin.zst");
if zst_path.exists() && len > 0 {
std::fs::create_dir_all(temp_dir)?;
return load_compressed(&zst_path, len, temp_dir);
}
Ok(MmapOrVec::new())
}
fn load_raw_or_zst_optional<T: Copy + Default + 'static>(base_path: &Path) -> MmapOrVec<T> {
let raw_path = base_path.with_extension("bin");
if raw_path.exists() {
let file_len = std::fs::metadata(&raw_path)
.map(|m| m.len() as usize)
.unwrap_or(0);
let elem_size = std::mem::size_of::<T>();
if file_len > 0 && elem_size > 0 {
let len = file_len / elem_size;
return MmapOrVec::load_mapped(&raw_path, len).unwrap_or_else(|_| MmapOrVec::new());
}
}
MmapOrVec::new()
}
fn load_compressed<T: Copy + Default + 'static>(
path: &Path,
len: usize,
temp_dir: &Path,
) -> std::io::Result<MmapOrVec<T>> {
if !path.exists() || len == 0 {
return Ok(MmapOrVec::new());
}
let compressed = std::fs::read(path)?;
let raw = zstd::decode_all(compressed.as_slice())?;
let file_name = path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("data")
.trim_end_matches(".zst");
let temp_path = temp_dir.join(file_name);
std::fs::write(&temp_path, &raw)?;
MmapOrVec::load_mapped(&temp_path, len)
}
pub(crate) struct SegmentCsr {
pub(crate) node_slots: MmapOrVec<super::csr::DiskNodeSlot>,
pub(crate) out_offsets: MmapOrVec<u64>,
pub(crate) out_edges: MmapOrVec<super::csr::CsrEdge>,
pub(crate) in_offsets: MmapOrVec<u64>,
pub(crate) in_edges: MmapOrVec<super::csr::CsrEdge>,
pub(crate) edge_endpoints: MmapOrVec<super::csr::EdgeEndpoints>,
pub(crate) conn_type_index_types: MmapOrVec<u64>,
pub(crate) conn_type_index_offsets: MmapOrVec<u64>,
pub(crate) conn_type_index_sources: MmapOrVec<u32>,
pub(crate) peer_count_types: MmapOrVec<u64>,
pub(crate) peer_count_offsets: MmapOrVec<u64>,
pub(crate) peer_count_entries: MmapOrVec<u32>,
}
impl SegmentCsr {
pub(crate) fn load_from(csr_dir: &Path, temp_dir: &Path) -> std::io::Result<Self> {
Ok(SegmentCsr {
node_slots: load_with_inferred_len(&csr_dir.join("node_slots"), temp_dir)?,
out_offsets: load_with_inferred_len(&csr_dir.join("out_offsets"), temp_dir)?,
out_edges: load_with_inferred_len(&csr_dir.join("out_edges"), temp_dir)?,
in_offsets: load_with_inferred_len(&csr_dir.join("in_offsets"), temp_dir)?,
in_edges: load_with_inferred_len(&csr_dir.join("in_edges"), temp_dir)?,
edge_endpoints: load_with_inferred_len(&csr_dir.join("edge_endpoints"), temp_dir)?,
conn_type_index_types: load_raw_or_zst_optional(&csr_dir.join("conn_type_index_types")),
conn_type_index_offsets: load_raw_or_zst_optional(
&csr_dir.join("conn_type_index_offsets"),
),
conn_type_index_sources: load_raw_or_zst_optional(
&csr_dir.join("conn_type_index_sources"),
),
peer_count_types: load_raw_or_zst_optional(&csr_dir.join("peer_count_types")),
peer_count_offsets: load_raw_or_zst_optional(&csr_dir.join("peer_count_offsets")),
peer_count_entries: load_raw_or_zst_optional(&csr_dir.join("peer_count_entries")),
})
}
}
fn reconcile_seg0_csr<T: Copy + Default + 'static>(
field: &mut MmapOrVec<T>,
seg0_len: usize,
) -> std::io::Result<()> {
let all = field.to_vec();
let path = field.file_path().map(PathBuf::from);
*field = MmapOrVec::from_vec(all);
if let Some(p) = path {
let f = std::fs::OpenOptions::new().write(true).open(&p)?;
f.set_len((seg0_len * std::mem::size_of::<T>()) as u64)?;
}
Ok(())
}
fn load_with_inferred_len<T: Copy + Default + 'static>(
base_path: &Path,
temp_dir: &Path,
) -> std::io::Result<MmapOrVec<T>> {
let elem = std::mem::size_of::<T>();
let raw_path = base_path.with_extension("bin");
if raw_path.exists() && elem > 0 {
let bytes = std::fs::metadata(&raw_path)?.len() as usize;
let len = bytes / elem;
if len > 0 {
return MmapOrVec::load_mapped(&raw_path, len);
}
}
let zst_path = base_path.with_extension("bin.zst");
if zst_path.exists() && elem > 0 {
std::fs::create_dir_all(temp_dir)?;
let compressed = std::fs::read(&zst_path)?;
let raw = zstd::decode_all(compressed.as_slice())?;
let file_name = zst_path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("data")
.trim_end_matches(".zst");
let temp_path = temp_dir.join(file_name);
std::fs::write(&temp_path, &raw)?;
let len = raw.len() / elem;
if len > 0 {
return MmapOrVec::load_mapped(&temp_path, len);
}
}
Ok(MmapOrVec::new())
}
pub(crate) fn concat_segment_csrs(mut segments: Vec<SegmentCsr>) -> SegmentCsr {
use super::csr::{CsrEdge, DiskNodeSlot, EdgeEndpoints};
match segments.len() {
0 => SegmentCsr {
node_slots: MmapOrVec::new(),
out_offsets: MmapOrVec::new(),
out_edges: MmapOrVec::new(),
in_offsets: MmapOrVec::new(),
in_edges: MmapOrVec::new(),
edge_endpoints: MmapOrVec::new(),
conn_type_index_types: MmapOrVec::new(),
conn_type_index_offsets: MmapOrVec::new(),
conn_type_index_sources: MmapOrVec::new(),
peer_count_types: MmapOrVec::new(),
peer_count_offsets: MmapOrVec::new(),
peer_count_entries: MmapOrVec::new(),
},
1 => segments.pop().unwrap(),
_ => {
let total_nodes: usize = segments.iter().map(|s| s.node_slots.len()).sum();
let total_out_edges: usize = segments.iter().map(|s| s.out_edges.len()).sum();
let total_in_edges: usize = segments.iter().map(|s| s.in_edges.len()).sum();
let total_endpoints: usize = segments.iter().map(|s| s.edge_endpoints.len()).sum();
let mut node_slots: MmapOrVec<DiskNodeSlot> = MmapOrVec::with_capacity(total_nodes);
let mut edge_endpoints: MmapOrVec<EdgeEndpoints> =
MmapOrVec::with_capacity(total_endpoints);
let mut out_offsets: MmapOrVec<u64> = MmapOrVec::with_capacity(total_nodes + 1);
let mut out_edges: MmapOrVec<CsrEdge> = MmapOrVec::with_capacity(total_out_edges);
let mut in_offsets: MmapOrVec<u64> = MmapOrVec::with_capacity(total_nodes + 1);
let mut in_edges: MmapOrVec<CsrEdge> = MmapOrVec::with_capacity(total_in_edges);
let mut node_lo: Vec<usize> = Vec::with_capacity(segments.len());
let mut node_hi: Vec<usize> = Vec::with_capacity(segments.len());
let mut endpoint_base: Vec<u32> = Vec::with_capacity(segments.len());
let mut is_full: Vec<bool> = Vec::with_capacity(segments.len());
let mut node_cursor = 0usize;
let mut ep_cursor: u32 = 0;
for seg in &segments {
node_lo.push(node_cursor);
node_cursor += seg.node_slots.len();
node_hi.push(node_cursor);
endpoint_base.push(ep_cursor);
ep_cursor += seg.edge_endpoints.len() as u32;
is_full.push(seg.out_offsets.len() > seg.node_slots.len() + 1);
}
for seg in &segments {
for i in 0..seg.node_slots.len() {
node_slots.push(seg.node_slots.get(i));
}
for i in 0..seg.edge_endpoints.len() {
edge_endpoints.push(seg.edge_endpoints.get(i));
}
}
out_offsets.push(0);
in_offsets.push(0);
for gid in 0..total_nodes {
for (k, seg) in segments.iter().enumerate() {
let key: Option<usize> = if is_full[k] {
if gid + 1 < seg.out_offsets.len() {
Some(gid)
} else {
None
}
} else if gid >= node_lo[k] && gid < node_hi[k] {
Some(gid - node_lo[k])
} else {
None
};
if let Some(key) = key {
let start = seg.out_offsets.get(key) as usize;
let end = seg.out_offsets.get(key + 1) as usize;
for i in start..end {
let mut e = seg.out_edges.get(i);
e.edge_idx = e.edge_idx.wrapping_add(endpoint_base[k]);
out_edges.push(e);
}
}
}
out_offsets.push(out_edges.len() as u64);
for (k, seg) in segments.iter().enumerate() {
let key: Option<usize> = if is_full[k] {
if gid + 1 < seg.in_offsets.len() {
Some(gid)
} else {
None
}
} else if gid >= node_lo[k] && gid < node_hi[k] {
Some(gid - node_lo[k])
} else {
None
};
if let Some(key) = key {
let start = seg.in_offsets.get(key) as usize;
let end = seg.in_offsets.get(key + 1) as usize;
for i in start..end {
let mut e = seg.in_edges.get(i);
e.edge_idx = e.edge_idx.wrapping_add(endpoint_base[k]);
in_edges.push(e);
}
}
}
in_offsets.push(in_edges.len() as u64);
}
let (cti_types, cti_offsets, cti_sources) = merge_conn_type_index(&segments);
let (pc_types, pc_offsets, pc_entries) = merge_peer_count_histogram(&segments);
SegmentCsr {
node_slots,
out_offsets,
out_edges,
in_offsets,
in_edges,
edge_endpoints,
conn_type_index_types: cti_types,
conn_type_index_offsets: cti_offsets,
conn_type_index_sources: cti_sources,
peer_count_types: pc_types,
peer_count_offsets: pc_offsets,
peer_count_entries: pc_entries,
}
}
}
}
fn merge_conn_type_index(
segments: &[SegmentCsr],
) -> (MmapOrVec<u64>, MmapOrVec<u64>, MmapOrVec<u32>) {
use std::collections::BTreeMap;
let mut node_lo: Vec<u32> = Vec::with_capacity(segments.len());
let mut is_full: Vec<bool> = Vec::with_capacity(segments.len());
let mut cursor: u32 = 0;
for seg in segments {
node_lo.push(cursor);
cursor += seg.node_slots.len() as u32;
is_full.push(seg.out_offsets.len() > seg.node_slots.len() + 1);
}
let mut type_to_segs: BTreeMap<u64, Vec<usize>> = BTreeMap::new();
for (si, seg) in segments.iter().enumerate() {
for i in 0..seg.conn_type_index_types.len() {
let t = seg.conn_type_index_types.get(i);
type_to_segs.entry(t).or_default().push(si);
}
}
let total_sources: usize = segments
.iter()
.map(|s| s.conn_type_index_sources.len())
.sum();
let mut out_types: MmapOrVec<u64> = MmapOrVec::with_capacity(type_to_segs.len());
let mut out_offsets: MmapOrVec<u64> = MmapOrVec::with_capacity(type_to_segs.len() + 1);
let mut out_sources: MmapOrVec<u32> = MmapOrVec::with_capacity(total_sources);
let mut cur_off: u64 = 0;
for (t, seg_idxs) in &type_to_segs {
out_types.push(*t);
out_offsets.push(cur_off);
for &si in seg_idxs {
let seg = &segments[si];
let shift = if is_full[si] { 0 } else { node_lo[si] };
let n = seg.conn_type_index_types.len();
for j in 0..n {
if seg.conn_type_index_types.get(j) == *t {
let start = seg.conn_type_index_offsets.get(j) as usize;
let end = seg.conn_type_index_offsets.get(j + 1) as usize;
for k in start..end {
out_sources.push(seg.conn_type_index_sources.get(k) + shift);
}
cur_off += (end - start) as u64;
break;
}
}
}
}
out_offsets.push(cur_off);
(out_types, out_offsets, out_sources)
}
fn merge_peer_count_histogram(
segments: &[SegmentCsr],
) -> (MmapOrVec<u64>, MmapOrVec<u64>, MmapOrVec<u32>) {
use std::collections::BTreeMap;
let mut by_type: BTreeMap<u64, BTreeMap<u32, u64>> = BTreeMap::new();
for seg in segments {
let n = seg.peer_count_types.len();
for i in 0..n {
let t = seg.peer_count_types.get(i);
let start = seg.peer_count_offsets.get(i) as usize;
let end = seg.peer_count_offsets.get(i + 1) as usize;
let type_bucket = by_type.entry(t).or_default();
let mut k = start;
while k < end {
let peer = seg.peer_count_entries.get(k * 2);
let count = seg.peer_count_entries.get(k * 2 + 1) as u64;
*type_bucket.entry(peer).or_insert(0) += count;
k += 1;
}
}
}
let mut out_types: MmapOrVec<u64> = MmapOrVec::with_capacity(by_type.len());
let mut out_offsets: MmapOrVec<u64> = MmapOrVec::with_capacity(by_type.len() + 1);
let mut out_entries: MmapOrVec<u32> = MmapOrVec::new();
let mut cur_pairs: u64 = 0;
for (t, peers) in &by_type {
out_types.push(*t);
out_offsets.push(cur_pairs);
for (peer, count) in peers {
out_entries.push(*peer);
out_entries.push((*count).min(u32::MAX as u64) as u32);
}
cur_pairs += peers.len() as u64;
}
out_offsets.push(cur_pairs);
(out_types, out_offsets, out_entries)
}