use std::{
alloc::{GlobalAlloc, Layout, System},
hint::black_box,
path::{Path, PathBuf},
sync::{
OnceLock,
atomic::{AtomicU64, Ordering},
},
time::Instant,
};
use datum::{
FileIO, Framing, InputStreamHandle, Keep, Materializer, OutputStreamHandle, Sink, Source,
StreamConverters, TcpIncomingConnection, TokioFileIO, TokioTcp,
};
use std::net::SocketAddr;
use std::time::Duration;
const FILE_BYTES: usize = 256 * 1024;
const FILE_CHUNK_SIZE: usize = 8192;
const FRAMING_LINES: usize = 10_000;
const JSON_OBJECTS: usize = 10_000;
const TCP_PAYLOAD_BYTES: usize = 64;
const ADAPTER_BYTES: usize = 256 * 1024;
const ADAPTER_CHUNK_SIZE: usize = 1024;
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
struct Scenario {
name: &'static str,
iterations: u64,
run: fn() -> u64,
}
fn main() {
let scenarios = [
Scenario {
name: "delimiter_framing_lines_10k",
iterations: 80,
run: delimiter_framing_lines_10k,
},
Scenario {
name: "json_framing_objects_10k",
iterations: 80,
run: json_framing_objects_10k,
},
Scenario {
name: "file_source_256k_chunk8192",
iterations: 100,
run: file_source_256k_chunk8192,
},
Scenario {
name: "file_sink_256k_chunk8192",
iterations: 100,
run: file_sink_256k_chunk8192,
},
Scenario {
name: "tokio_file_source_256k_chunk8192",
iterations: 100,
run: tokio_file_source_256k_chunk8192,
},
Scenario {
name: "tokio_file_sink_256k_chunk8192",
iterations: 100,
run: tokio_file_sink_256k_chunk8192,
},
Scenario {
name: "tcp_echo_roundtrip_64b",
iterations: 100,
run: tcp_echo_roundtrip_64b,
},
Scenario {
name: "adapter_roundtrip_256k",
iterations: 100,
run: adapter_roundtrip_256k,
},
];
println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
for scenario in scenarios {
for _ in 0..3 {
black_box((scenario.run)());
}
let cpu_start = process_cpu_ns();
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let started = Instant::now();
let mut checksum = 0_u64;
for _ in 0..scenario.iterations {
checksum = checksum.wrapping_add(black_box((scenario.run)()));
}
let elapsed = started.elapsed();
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
black_box(checksum);
let ns_per_op = elapsed.as_nanos() as f64 / scenario.iterations as f64;
let allocated_bytes_per_op = allocated_bytes as f64 / scenario.iterations as f64;
let cpu_ns_per_op = cpu_delta as f64 / scenario.iterations as f64;
println!(
"{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
scenario.name, scenario.iterations
);
}
}
fn materializer() -> &'static Materializer {
static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
MATERIALIZER.get_or_init(Materializer::new)
}
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
fn bench_file_path() -> &'static Path {
static PATH: OnceLock<PathBuf> = OnceLock::new();
PATH.get_or_init(|| {
let path = std::env::temp_dir().join(format!(
"datum-streaming-io-compare-{}-{}.bin",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock after epoch")
.as_nanos()
));
std::fs::write(&path, vec![b'x'; FILE_BYTES]).expect("write streaming io compare file");
path
})
.as_path()
}
fn sync_sink_file_path() -> &'static Path {
static PATH: OnceLock<PathBuf> = OnceLock::new();
PATH.get_or_init(|| {
std::env::temp_dir().join(format!(
"datum-streaming-io-compare-sync-sink-{}-{}.bin",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock after epoch")
.as_nanos()
))
})
.as_path()
}
fn tokio_sink_file_path() -> &'static Path {
static PATH: OnceLock<PathBuf> = OnceLock::new();
PATH.get_or_init(|| {
std::env::temp_dir().join(format!(
"datum-streaming-io-compare-tokio-sink-{}-{}.bin",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock after epoch")
.as_nanos()
))
})
.as_path()
}
fn delimiter_chunks() -> Vec<Vec<u8>> {
let line = b"line-0123456789\n";
let mut payload = Vec::with_capacity(line.len() * FRAMING_LINES);
for _ in 0..FRAMING_LINES {
payload.extend_from_slice(line);
}
payload
.chunks(257)
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>()
}
fn json_chunks() -> Vec<Vec<u8>> {
let object = br#"{"id":42,"name":"datum","active":true}"#;
let mut payload = Vec::with_capacity((object.len() + 1) * JSON_OBJECTS + 2);
payload.push(b'[');
for index in 0..JSON_OBJECTS {
if index > 0 {
payload.push(b',');
}
payload.extend_from_slice(object);
}
payload.push(b']');
payload
.chunks(193)
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>()
}
fn file_chunks() -> Vec<Vec<u8>> {
vec![vec![b'x'; FILE_CHUNK_SIZE]; FILE_BYTES / FILE_CHUNK_SIZE]
}
fn delimiter_framing_lines_10k() -> u64 {
Source::from_iter(delimiter_chunks())
.via(Framing::delimiter(b"\n".to_vec(), 64, false))
.run_with_materializer(Sink::fold(0_u64, |count, _| count + 1), materializer())
.unwrap()
.wait()
.unwrap()
}
fn json_framing_objects_10k() -> u64 {
Source::from_iter(json_chunks())
.via(Framing::json(256))
.run_with_materializer(Sink::fold(0_u64, |count, _| count + 1), materializer())
.unwrap()
.wait()
.unwrap()
}
fn file_source_256k_chunk8192() -> u64 {
FileIO::from_path(bench_file_path().to_path_buf(), FILE_CHUNK_SIZE)
.run_with_materializer(
Sink::fold(0_u64, |count, chunk: Vec<u8>| count + chunk.len() as u64),
materializer(),
)
.unwrap()
.wait()
.unwrap()
}
fn file_sink_256k_chunk8192() -> u64 {
Source::from_iter(file_chunks())
.run_with_materializer(
FileIO::to_path(sync_sink_file_path().to_path_buf()),
materializer(),
)
.unwrap()
.wait()
.unwrap();
FILE_BYTES as u64
}
fn tokio_file_source_256k_chunk8192() -> u64 {
TokioFileIO::from_path(bench_file_path().to_path_buf(), FILE_CHUNK_SIZE)
.run_with_materializer(
Sink::fold(0_u64, |count, chunk: Vec<u8>| count + chunk.len() as u64),
materializer(),
)
.unwrap()
.wait()
.unwrap()
}
fn tokio_file_sink_256k_chunk8192() -> u64 {
let result = Source::from_iter(file_chunks())
.run_with_materializer(
TokioFileIO::to_path(tokio_sink_file_path().to_path_buf()),
materializer(),
)
.unwrap()
.wait()
.unwrap();
result.status().unwrap();
result.bytes()
}
struct EchoServer {
addr: SocketAddr,
_completion: datum::StreamCompletion<datum::NotUsed>,
}
fn tcp_echo_addr() -> SocketAddr {
static SERVER: OnceLock<EchoServer> = OnceLock::new();
SERVER
.get_or_init(|| {
let (binding, completion) = TokioTcp::bind("127.0.0.1:0", FILE_CHUNK_SIZE)
.to_mat(
Sink::foreach(|connection: TcpIncomingConnection| {
let (source, sink) = connection.into_parts();
let result = source
.run_with(sink)
.expect("echo stream materializes")
.wait()
.expect("echo stream completes");
result.status().expect("echo stream writes successfully");
}),
Keep::both,
)
.run_with_materializer(materializer())
.expect("echo server materializes");
let binding = binding.wait().expect("echo server binds");
EchoServer {
addr: binding.local_addr(),
_completion: completion,
}
})
.addr
}
fn tcp_echo_roundtrip_64b() -> u64 {
Source::single(vec![b'p'; TCP_PAYLOAD_BYTES])
.via(TokioTcp::outgoing_connection(
tcp_echo_addr(),
FILE_CHUNK_SIZE,
))
.run_with_materializer(Sink::head(), materializer())
.unwrap()
.wait()
.unwrap()
.len() as u64
}
fn adapter_chunks() -> Vec<Vec<u8>> {
(0..ADAPTER_BYTES / ADAPTER_CHUNK_SIZE)
.map(|_| vec![b'a'; ADAPTER_CHUNK_SIZE])
.collect()
}
fn adapter_roundtrip_256k() -> u64 {
let (mut out_handle, mut in_handle): (OutputStreamHandle, InputStreamHandle) =
StreamConverters::as_output_stream(Duration::from_secs(30))
.to_mat(
StreamConverters::as_input_stream(Duration::from_secs(30)),
Keep::both,
)
.run_with_materializer(materializer())
.expect("adapter round-trip materializes");
let chunks = black_box(adapter_chunks());
let read_thread = std::thread::spawn(move || {
let mut buf = vec![0_u8; ADAPTER_BYTES];
let mut total = 0_usize;
while total < ADAPTER_BYTES {
use std::io::Read;
let n = in_handle.read(&mut buf[total..]).expect("read chunk");
if n == 0 {
break;
}
total += n;
}
total
});
for chunk in &chunks {
use std::io::Write;
out_handle.write_all(chunk).expect("write chunk");
}
out_handle.close().expect("close output");
read_thread.join().expect("read thread joins") as u64
}