use super::csr::{CsrEdge, EdgeEndpoints, MergeSortEntry};
use crate::graph::storage::mapped::mmap_vec::MmapOrVec;
use std::collections::HashMap;
use std::path::Path;
#[derive(Clone, Debug, Default)]
pub(super) struct BuilderConfig {
pub(super) chunk_mb_override: Option<usize>,
pub(super) force_chunks: Option<usize>,
}
impl BuilderConfig {
pub(super) fn from_env() -> Self {
let chunk_mb_override = std::env::var("KGLITE_CSR_CHUNK_MB")
.ok()
.and_then(|v| v.parse().ok())
.filter(|&v: &usize| v > 0);
let force_chunks = std::env::var("KGLITE_CSR_FORCE_CHUNKS")
.ok()
.and_then(|v| v.parse().ok())
.filter(|&v: &usize| v > 0);
Self {
chunk_mb_override,
force_chunks,
}
}
}
pub(super) struct CsrArtifacts {
pub(super) edge_endpoints: MmapOrVec<EdgeEndpoints>,
pub(super) out_offsets: MmapOrVec<u64>,
pub(super) in_offsets: MmapOrVec<u64>,
pub(super) out_edges: MmapOrVec<CsrEdge>,
pub(super) in_edges: MmapOrVec<CsrEdge>,
pub(super) edge_type_counts: HashMap<u64, usize>,
}
pub(super) fn build_csr_files(
pending: &MmapOrVec<(u32, u32, u64)>,
edge_count: usize,
node_bound: usize,
build_dir: &Path,
tmp_dir: &Path,
config: &BuilderConfig,
verbose: bool,
) -> CsrArtifacts {
let step = std::time::Instant::now();
let mut edge_endpoints = MmapOrVec::mapped(&build_dir.join("edge_endpoints.bin"), edge_count)
.unwrap_or_else(|_| MmapOrVec::with_capacity(edge_count));
let mut out_counts = vec![0u64; node_bound];
let mut in_counts = vec![0u64; node_bound];
let mut edge_type_counts: HashMap<u64, usize> = HashMap::new();
for i in 0..edge_count {
let (src, tgt, ct) = pending.get(i);
edge_endpoints.push(EdgeEndpoints {
source: src,
target: tgt,
connection_type: ct,
});
if (src as usize) < node_bound {
out_counts[src as usize] += 1;
}
if (tgt as usize) < node_bound {
in_counts[tgt as usize] += 1;
}
*edge_type_counts.entry(ct).or_insert(0) += 1;
}
if verbose {
eprintln!(
" CSR step 1/4: endpoints + degrees ({:.1}s)",
step.elapsed().as_secs_f64()
);
}
let step = std::time::Instant::now();
let mut out_offsets: MmapOrVec<u64> =
MmapOrVec::mapped(&build_dir.join("out_offsets.bin"), node_bound + 1)
.unwrap_or_else(|_| MmapOrVec::with_capacity(node_bound + 1));
let mut in_offsets: MmapOrVec<u64> =
MmapOrVec::mapped(&build_dir.join("in_offsets.bin"), node_bound + 1)
.unwrap_or_else(|_| MmapOrVec::with_capacity(node_bound + 1));
let mut out_acc = 0u64;
let mut in_acc = 0u64;
for i in 0..node_bound {
out_offsets.push(out_acc);
in_offsets.push(in_acc);
out_acc += out_counts[i];
in_acc += in_counts[i];
}
out_offsets.push(out_acc);
in_offsets.push(in_acc);
drop(out_counts);
drop(in_counts);
if verbose {
eprintln!(
" CSR step 2/4: build offsets ({:.1}s)",
step.elapsed().as_secs_f64()
);
}
let step = std::time::Instant::now();
let out_edges = merge_sort_build(
pending, edge_count, true, tmp_dir, build_dir, "out", config, verbose,
);
if verbose {
eprintln!(
" CSR step 3/4: out_edges merge sort ({:.1}s)",
step.elapsed().as_secs_f64()
);
}
let step = std::time::Instant::now();
let in_edges = merge_sort_build(
pending, edge_count, false, tmp_dir, build_dir, "in", config, verbose,
);
if verbose {
eprintln!(
" CSR step 4/4: in_edges merge sort ({:.1}s)",
step.elapsed().as_secs_f64()
);
}
CsrArtifacts {
edge_endpoints,
out_offsets,
in_offsets,
out_edges,
in_edges,
edge_type_counts,
}
}
#[allow(clippy::too_many_arguments)]
fn merge_sort_build(
pending: &MmapOrVec<(u32, u32, u64)>,
edge_count: usize,
by_source: bool,
chunk_dir: &Path,
output_dir: &Path,
label: &str,
config: &BuilderConfig,
verbose: bool,
) -> MmapOrVec<CsrEdge> {
let force_chunks = config.force_chunks.unwrap_or(0);
let chunk_mb = config.chunk_mb_override.unwrap_or(0);
let (chunk_size, num_chunks) = if force_chunks > 0 {
let cs = edge_count.div_ceil(force_chunks);
(cs, force_chunks.min(edge_count.div_ceil(cs)))
} else {
let max_bytes = if chunk_mb > 0 {
chunk_mb << 20
} else {
12 << 30 };
let max_entries = max_bytes / std::mem::size_of::<MergeSortEntry>();
let cs = max_entries.min(edge_count);
(cs, edge_count.div_ceil(cs))
};
if num_chunks == 1 {
let step = std::time::Instant::now();
let mut entries: Vec<MergeSortEntry> = Vec::with_capacity(edge_count);
for i in 0..edge_count {
let (src, tgt, ct) = pending.get(i);
let (key, peer) = if by_source { (src, tgt) } else { (tgt, src) };
entries.push(MergeSortEntry {
key,
conn_type: ct,
peer,
orig_idx: i as u32,
});
}
entries.sort_unstable_by_key(|e| (e.key, e.conn_type));
let mut output =
MmapOrVec::mapped(&output_dir.join(format!("{}_edges.bin", label)), edge_count)
.unwrap_or_else(|_| MmapOrVec::with_capacity(edge_count));
for entry in &entries {
output.push(CsrEdge {
peer: entry.peer,
edge_idx: entry.orig_idx,
});
}
drop(entries);
if verbose {
eprintln!(
" {label} single-chunk sort+write: {:.1}s",
step.elapsed().as_secs_f64()
);
}
return output;
}
let step = std::time::Instant::now();
let mut chunk_mmaps: Vec<MmapOrVec<MergeSortEntry>> = Vec::new();
let mut chunk_lens: Vec<usize> = Vec::new();
for c in 0..num_chunks {
let start = c * chunk_size;
let end = (start + chunk_size).min(edge_count);
let len = end - start;
let mut chunk: Vec<MergeSortEntry> = Vec::with_capacity(len);
for i in start..end {
let (src, tgt, ct) = pending.get(i);
let (key, peer) = if by_source { (src, tgt) } else { (tgt, src) };
chunk.push(MergeSortEntry {
key,
conn_type: ct,
peer,
orig_idx: i as u32,
});
}
chunk.sort_unstable_by_key(|e| (e.key, e.conn_type));
let path = chunk_dir.join(format!("chunk_{}_{}.bin", label, c));
let mut mmap: MmapOrVec<MergeSortEntry> =
MmapOrVec::mapped(&path, len).unwrap_or_else(|_| MmapOrVec::with_capacity(len));
for entry in &chunk {
mmap.push(*entry);
}
chunk_mmaps.push(mmap);
chunk_lens.push(len);
drop(chunk);
}
if verbose {
eprintln!(
" {label} sort {num_chunks} chunks: {:.1}s",
step.elapsed().as_secs_f64()
);
}
let merge_start = std::time::Instant::now();
let mut positions: Vec<usize> = vec![0; num_chunks];
let mut output =
MmapOrVec::mapped(&output_dir.join(format!("{}_edges.bin", label)), edge_count)
.unwrap_or_else(|_| MmapOrVec::with_capacity(edge_count));
use std::cmp::Reverse;
let mut heap: std::collections::BinaryHeap<Reverse<(u32, u64, usize)>> =
std::collections::BinaryHeap::with_capacity(num_chunks);
for c in 0..num_chunks {
if positions[c] < chunk_lens[c] {
let entry = chunk_mmaps[c].get(positions[c]);
heap.push(Reverse((entry.key, entry.conn_type, c)));
}
}
for _ in 0..edge_count {
let Reverse((_key, _ct, best_chunk)) = heap.pop().unwrap();
let entry = chunk_mmaps[best_chunk].get(positions[best_chunk]);
positions[best_chunk] += 1;
output.push(CsrEdge {
peer: entry.peer,
edge_idx: entry.orig_idx,
});
if positions[best_chunk] < chunk_lens[best_chunk] {
let next = chunk_mmaps[best_chunk].get(positions[best_chunk]);
heap.push(Reverse((next.key, next.conn_type, best_chunk)));
}
}
for c in 0..num_chunks {
let path = chunk_dir.join(format!("chunk_{}_{}.bin", label, c));
let _ = std::fs::remove_file(path);
}
drop(chunk_mmaps);
if verbose {
eprintln!(
" {label} merge: {:.1}s",
merge_start.elapsed().as_secs_f64()
);
}
output
}