use std::{
alloc::{GlobalAlloc, Layout, System},
hint::black_box,
sync::{
OnceLock,
atomic::{AtomicU64, Ordering},
},
time::Instant,
};
use datum::{Keep, Materializer, NotUsed, RunnableGraph, Sink, Source};
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
struct Scenario {
name: &'static str,
iterations: u64,
run: fn() -> u64,
}
fn main() {
let scenarios = [
Scenario {
name: "graph_construction_1_stages",
iterations: 50_000,
run: graph_construction_1_stages,
},
Scenario {
name: "graph_construction_10_stages",
iterations: 20_000,
run: graph_construction_10_stages,
},
Scenario {
name: "graph_construction_100_stages",
iterations: 5_000,
run: graph_construction_100_stages,
},
Scenario {
name: "graph_construction_1000_stages",
iterations: 500,
run: graph_construction_1000_stages,
},
Scenario {
name: "materialization_latency_1_operators",
iterations: 5_000,
run: materialization_latency_1_operators,
},
Scenario {
name: "materialization_latency_10_operators",
iterations: 5_000,
run: materialization_latency_10_operators,
},
Scenario {
name: "materialization_latency_100_operators",
iterations: 2_000,
run: materialization_latency_100_operators,
},
Scenario {
name: "materialized_value_keep_left",
iterations: 5_000,
run: materialized_value_keep_left,
},
Scenario {
name: "materialized_value_keep_right",
iterations: 5_000,
run: materialized_value_keep_right,
},
Scenario {
name: "materialized_value_keep_both",
iterations: 5_000,
run: materialized_value_keep_both,
},
Scenario {
name: "materialized_value_keep_none",
iterations: 5_000,
run: materialized_value_keep_none,
},
Scenario {
name: "materialized_value_custom",
iterations: 5_000,
run: materialized_value_custom,
},
Scenario {
name: "eager_source_first_demand",
iterations: 5_000,
run: eager_source_first_demand,
},
Scenario {
name: "lazy_source_first_demand",
iterations: 5_000,
run: lazy_source_first_demand,
},
Scenario {
name: "sink_terminal_ignore",
iterations: 4_000,
run: sink_terminal_ignore,
},
Scenario {
name: "sink_terminal_head",
iterations: 4_000,
run: sink_terminal_head,
},
Scenario {
name: "sink_terminal_fold",
iterations: 4_000,
run: sink_terminal_fold,
},
Scenario {
name: "sink_terminal_foreach",
iterations: 4_000,
run: sink_terminal_foreach,
},
Scenario {
name: "repeated_materialization",
iterations: 4_000,
run: repeated_materialization,
},
];
println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
for scenario in scenarios {
for _ in 0..10 {
black_box((scenario.run)());
}
let cpu_start = process_cpu_ns();
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let started = Instant::now();
let mut checksum = 0_u64;
for _ in 0..scenario.iterations {
checksum = checksum.wrapping_add(black_box((scenario.run)()));
}
let elapsed = started.elapsed();
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
black_box(checksum);
let ns_per_op = elapsed.as_nanos() as f64 / scenario.iterations as f64;
let allocated_bytes_per_op = allocated_bytes as f64 / scenario.iterations as f64;
let cpu_ns_per_op = cpu_delta as f64 / scenario.iterations as f64;
println!(
"{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
scenario.name, scenario.iterations
);
}
}
fn shared_materializer() -> &'static Materializer {
static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
MATERIALIZER.get_or_init(Materializer::new)
}
fn ignore_graph(stages: usize) -> RunnableGraph<NotUsed> {
let mut source = Source::from_iter(0_u64..1);
for _ in 0..stages {
source = source.map(|item| item.wrapping_add(1));
}
source.to(Sink::ignore())
}
fn build_construction(stages: usize) -> u64 {
let graph = black_box(ignore_graph(stages));
black_box(&graph);
stages as u64
}
fn graph_construction_1_stages() -> u64 {
build_construction(1)
}
fn graph_construction_10_stages() -> u64 {
build_construction(10)
}
fn graph_construction_100_stages() -> u64 {
build_construction(100)
}
fn graph_construction_1000_stages() -> u64 {
build_construction(1000)
}
fn materialization_latency_1_operators() -> u64 {
static GRAPH: OnceLock<RunnableGraph<NotUsed>> = OnceLock::new();
let graph = GRAPH.get_or_init(|| ignore_graph(1));
graph.run_with_materializer(shared_materializer()).unwrap();
0
}
fn materialization_latency_10_operators() -> u64 {
static GRAPH: OnceLock<RunnableGraph<NotUsed>> = OnceLock::new();
let graph = GRAPH.get_or_init(|| ignore_graph(10));
graph.run_with_materializer(shared_materializer()).unwrap();
0
}
fn materialization_latency_100_operators() -> u64 {
static GRAPH: OnceLock<RunnableGraph<NotUsed>> = OnceLock::new();
let graph = GRAPH.get_or_init(|| ignore_graph(100));
graph.run_with_materializer(shared_materializer()).unwrap();
0
}
fn materialized_value_keep_left() -> u64 {
Source::single(1_u64)
.map_materialized_value(|_| 1_u64)
.to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::left)
.run_with_materializer(shared_materializer())
.unwrap()
}
fn materialized_value_keep_right() -> u64 {
Source::single(1_u64)
.map_materialized_value(|_| 1_u64)
.to_mat(
Sink::ignore().map_materialized_value(|_| 2_u64),
Keep::right,
)
.run_with_materializer(shared_materializer())
.unwrap()
}
fn materialized_value_keep_both() -> u64 {
let (left, right) = Source::single(1_u64)
.map_materialized_value(|_| 1_u64)
.to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::both)
.run_with_materializer(shared_materializer())
.unwrap();
left.wrapping_add(right)
}
fn materialized_value_keep_none() -> u64 {
let _: NotUsed = Source::single(1_u64)
.map_materialized_value(|_| 1_u64)
.to_mat(Sink::ignore().map_materialized_value(|_| 2_u64), Keep::none)
.run_with_materializer(shared_materializer())
.unwrap();
0
}
fn materialized_value_custom() -> u64 {
Source::single(1_u64)
.map_materialized_value(|_| 1_u64)
.to_mat(
Sink::ignore().map_materialized_value(|_| 2_u64),
|left, right| left + right,
)
.run_with_materializer(shared_materializer())
.unwrap()
}
fn eager_source_first_demand() -> u64 {
let (mat, queue) = Source::single(1_u64)
.map_materialized_value(|_| 7_u64)
.to_mat(Sink::queue(), Keep::both)
.run_with_materializer(shared_materializer())
.unwrap();
queue.pull().unwrap().unwrap_or_default().wrapping_add(mat)
}
fn lazy_source_first_demand() -> u64 {
let (mat, queue) =
Source::<u64>::lazy_source(|| Source::single(1_u64).map_materialized_value(|_| 7_u64))
.to_mat(Sink::queue(), Keep::both)
.run_with_materializer(shared_materializer())
.unwrap();
let item = queue.pull().unwrap().unwrap_or_default();
item.wrapping_add(mat.wait().unwrap())
}
fn sink_terminal_ignore() -> u64 {
Source::from_iter(0_u64..1_000)
.run_with_materializer(Sink::ignore(), shared_materializer())
.unwrap()
.wait()
.unwrap();
0
}
fn sink_terminal_head() -> u64 {
Source::from_iter(0_u64..1_000)
.run_with_materializer(Sink::head(), shared_materializer())
.unwrap()
.wait()
.unwrap()
}
fn sink_terminal_fold() -> u64 {
Source::from_iter(0_u64..1_000)
.run_with_materializer(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
shared_materializer(),
)
.unwrap()
.wait()
.unwrap()
}
fn sink_terminal_foreach() -> u64 {
let sum = std::sync::Arc::new(AtomicU64::new(0));
let sink_sum = std::sync::Arc::clone(&sum);
Source::from_iter(0_u64..1_000)
.run_with_materializer(
Sink::foreach(move |item| {
sink_sum.fetch_add(item, Ordering::Relaxed);
}),
shared_materializer(),
)
.unwrap()
.wait()
.unwrap();
sum.load(Ordering::Relaxed)
}
fn repeated_materialization() -> u64 {
static GRAPH: OnceLock<RunnableGraph<datum::StreamCompletion<u64>>> = OnceLock::new();
let graph = GRAPH.get_or_init(|| {
Source::from_iter(0_u64..128)
.map(|item| item.wrapping_add(1))
.to_mat(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
Keep::right,
)
});
graph
.run_with_materializer(shared_materializer())
.unwrap()
.wait()
.unwrap()
}