use interstellar::prelude::*;
use interstellar::storage::Graph;
use std::alloc::{GlobalAlloc, Layout, System};
use std::collections::HashMap;
use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};
struct TrackingAllocator {
inner: System,
allocated: AtomicIsize,
peak: AtomicUsize,
allocation_count: AtomicUsize,
}
impl TrackingAllocator {
const fn new() -> Self {
Self {
inner: System,
allocated: AtomicIsize::new(0),
peak: AtomicUsize::new(0),
allocation_count: AtomicUsize::new(0),
}
}
fn reset(&self) {
self.allocated.store(0, Ordering::SeqCst);
self.peak.store(0, Ordering::SeqCst);
self.allocation_count.store(0, Ordering::SeqCst);
}
fn allocated(&self) -> usize {
self.allocated.load(Ordering::SeqCst).max(0) as usize
}
fn peak(&self) -> usize {
self.peak.load(Ordering::SeqCst)
}
fn allocation_count(&self) -> usize {
self.allocation_count.load(Ordering::SeqCst)
}
}
unsafe impl GlobalAlloc for TrackingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = self.inner.alloc(layout);
if !ptr.is_null() {
let size = layout.size() as isize;
let new_val = self.allocated.fetch_add(size, Ordering::SeqCst) + size;
if new_val > 0 {
let new_usize = new_val as usize;
let mut peak = self.peak.load(Ordering::SeqCst);
while new_usize > peak {
match self.peak.compare_exchange_weak(
peak,
new_usize,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(p) => peak = p,
}
}
}
self.allocation_count.fetch_add(1, Ordering::SeqCst);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
self.allocated
.fetch_sub(layout.size() as isize, Ordering::SeqCst);
self.inner.dealloc(ptr, layout);
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = self.inner.realloc(ptr, layout, new_size);
if !new_ptr.is_null() {
let old_size = layout.size() as isize;
let new_size_signed = new_size as isize;
let diff = new_size_signed - old_size;
let new_val = self.allocated.fetch_add(diff, Ordering::SeqCst) + diff;
if diff > 0 && new_val > 0 {
let new_usize = new_val as usize;
let mut peak = self.peak.load(Ordering::SeqCst);
while new_usize > peak {
match self.peak.compare_exchange_weak(
peak,
new_usize,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(p) => peak = p,
}
}
}
}
new_ptr
}
}
#[global_allocator]
static ALLOCATOR: TrackingAllocator = TrackingAllocator::new();
fn create_test_graph(num_vertices: usize) -> Graph {
let graph = Graph::new();
for i in 0..num_vertices {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String(format!("person_{}", i)));
props.insert("age".to_string(), Value::Int((i % 100) as i64));
props.insert(
"bio".to_string(),
Value::String(format!(
"This is a longer biography for person {} to increase memory usage per vertex.",
i
)),
);
graph.add_vertex("person", props);
}
graph
}
#[test]
fn compare_memory_streaming_vs_eager() {
let graph = create_test_graph(10_000);
let snapshot = graph.snapshot();
println!("\n{}", "=".repeat(60));
println!("Memory Usage Comparison: Streaming vs Eager");
println!("Graph: 10,000 vertices with properties");
println!("{}\n", "=".repeat(60));
let eager_net;
{
ALLOCATOR.reset();
let g = snapshot.gremlin();
std::hint::black_box(());
let start_alloc = ALLOCATOR.allocated();
let start_count = ALLOCATOR.allocation_count();
let result: Vec<_> = g
.v()
.values("name")
.to_list()
.into_iter()
.take(10)
.collect();
let peak = ALLOCATOR.peak();
let end_alloc = ALLOCATOR.allocated();
let end_count = ALLOCATOR.allocation_count();
eager_net = end_alloc.saturating_sub(start_alloc);
println!("EAGER (to_list + take 10):");
println!(" Result count: {}", result.len());
println!(" Allocations: {}", end_count - start_count);
println!(
" Peak memory: {} bytes ({:.2} KB)",
peak,
peak as f64 / 1024.0
);
println!(" Net allocated: {} bytes", eager_net);
println!();
}
let streaming_net;
{
ALLOCATOR.reset();
let g = snapshot.gremlin();
std::hint::black_box(());
let start_alloc = ALLOCATOR.allocated();
let start_count = ALLOCATOR.allocation_count();
let result: Vec<_> = g.v().values("name").iter().take(10).collect();
let peak = ALLOCATOR.peak();
let end_alloc = ALLOCATOR.allocated();
let end_count = ALLOCATOR.allocation_count();
streaming_net = end_alloc.saturating_sub(start_alloc);
println!("STREAMING (iter + take 10):");
println!(" Result count: {}", result.len());
println!(" Allocations: {}", end_count - start_count);
println!(
" Peak memory: {} bytes ({:.2} KB)",
peak,
peak as f64 / 1024.0
);
println!(" Net allocated: {} bytes", streaming_net);
println!();
}
if eager_net > 0 && streaming_net > 0 {
let ratio = eager_net as f64 / streaming_net as f64;
println!(
"RESULT: Streaming uses {:.0}x less memory for early termination!\n",
ratio
);
}
println!("--- Full Collection (all 10,000 values) ---\n");
{
ALLOCATOR.reset();
let g = snapshot.gremlin();
std::hint::black_box(());
let start_count = ALLOCATOR.allocation_count();
let result = g.v().values("name").to_list();
let peak_eager = ALLOCATOR.peak();
let count_eager = ALLOCATOR.allocation_count() - start_count;
println!("EAGER (to_list all):");
println!(" Result count: {}", result.len());
println!(" Allocations: {}", count_eager);
println!(
" Peak memory: {} bytes ({:.2} KB)",
peak_eager,
peak_eager as f64 / 1024.0
);
drop(result);
}
{
ALLOCATOR.reset();
let g = snapshot.gremlin();
std::hint::black_box(());
let start_count = ALLOCATOR.allocation_count();
let result: Vec<_> = g.v().values("name").iter().collect();
let peak_streaming = ALLOCATOR.peak();
let count_streaming = ALLOCATOR.allocation_count() - start_count;
println!("STREAMING (iter collect all):");
println!(" Result count: {}", result.len());
println!(" Allocations: {}", count_streaming);
println!(
" Peak memory: {} bytes ({:.2} KB)",
peak_streaming,
peak_streaming as f64 / 1024.0
);
drop(result);
}
println!("\n{}", "=".repeat(60));
}
#[test]
fn streaming_constant_memory_per_element() {
let graph = create_test_graph(10_000);
let snapshot = graph.snapshot();
println!("\n{}", "=".repeat(60));
println!("Streaming Memory Per Element (not collecting)");
println!("{}\n", "=".repeat(60));
for count in [10, 100, 1000, 5000] {
ALLOCATOR.reset();
let g = snapshot.gremlin();
let start_peak = ALLOCATOR.peak();
let mut processed = 0;
for value in g.v().values("name").iter().take(count) {
std::hint::black_box(&value);
processed += 1;
}
let peak = ALLOCATOR.peak();
let mem_per_element = if processed > 0 {
(peak - start_peak) as f64 / processed as f64
} else {
0.0
};
println!(
"Processed {} elements: peak {} bytes ({:.1} bytes/element)",
processed, peak, mem_per_element
);
}
println!("\nNote: Memory per element should be roughly constant (O(1))");
println!("{}\n", "=".repeat(60));
}