datum-core 0.3.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use std::{
    alloc::{GlobalAlloc, Layout, System},
    hint::black_box,
    path::{Path, PathBuf},
    sync::{
        OnceLock,
        atomic::{AtomicU64, Ordering},
    },
    time::Instant,
};

use datum::{
    FileIO, Framing, Keep, Materializer, Sink, Source, TcpIncomingConnection, TokioFileIO, TokioTcp,
};
use std::net::SocketAddr;

const FILE_BYTES: usize = 256 * 1024;
const FILE_CHUNK_SIZE: usize = 8192;
const FRAMING_LINES: usize = 10_000;
const JSON_OBJECTS: usize = 10_000;
const TCP_PAYLOAD_BYTES: usize = 64;

struct CountingAllocator;

static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);

unsafe impl GlobalAlloc for CountingAllocator {
    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc(layout) };
        if !ptr.is_null() {
            ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc_zeroed(layout) };
        if !ptr.is_null() {
            ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
        unsafe { System.dealloc(ptr, layout) };
    }

    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
        let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
        if !new_ptr.is_null() {
            if new_ptr == ptr {
                if new_size > layout.size() {
                    ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
                }
            } else {
                ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
            }
        }
        new_ptr
    }
}

#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;

struct Scenario {
    name: &'static str,
    iterations: u64,
    run: fn() -> u64,
}

fn main() {
    let scenarios = [
        Scenario {
            name: "delimiter_framing_lines_10k",
            iterations: 80,
            run: delimiter_framing_lines_10k,
        },
        Scenario {
            name: "json_framing_objects_10k",
            iterations: 80,
            run: json_framing_objects_10k,
        },
        Scenario {
            name: "file_source_256k_chunk8192",
            iterations: 100,
            run: file_source_256k_chunk8192,
        },
        Scenario {
            name: "file_sink_256k_chunk8192",
            iterations: 100,
            run: file_sink_256k_chunk8192,
        },
        Scenario {
            name: "tokio_file_source_256k_chunk8192",
            iterations: 100,
            run: tokio_file_source_256k_chunk8192,
        },
        Scenario {
            name: "tokio_file_sink_256k_chunk8192",
            iterations: 100,
            run: tokio_file_sink_256k_chunk8192,
        },
        Scenario {
            name: "tcp_echo_roundtrip_64b",
            iterations: 100,
            run: tcp_echo_roundtrip_64b,
        },
    ];

    println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
    for scenario in scenarios {
        for _ in 0..3 {
            black_box((scenario.run)());
        }

        let cpu_start = process_cpu_ns();
        ALLOCATED_BYTES.store(0, Ordering::Relaxed);
        let started = Instant::now();
        let mut checksum = 0_u64;
        for _ in 0..scenario.iterations {
            checksum = checksum.wrapping_add(black_box((scenario.run)()));
        }
        let elapsed = started.elapsed();
        let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
        let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
        black_box(checksum);

        let ns_per_op = elapsed.as_nanos() as f64 / scenario.iterations as f64;
        let allocated_bytes_per_op = allocated_bytes as f64 / scenario.iterations as f64;
        let cpu_ns_per_op = cpu_delta as f64 / scenario.iterations as f64;
        println!(
            "{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
            scenario.name, scenario.iterations
        );
    }
}

fn materializer() -> &'static Materializer {
    static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
    MATERIALIZER.get_or_init(Materializer::new)
}

fn process_cpu_ns() -> u128 {
    let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
        return 0;
    };
    let Some(close) = stat.rfind(')') else {
        return 0;
    };
    let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
    if fields.len() <= 12 {
        return 0;
    }
    let utime: u64 = fields[11].parse().unwrap_or(0);
    let stime: u64 = fields[12].parse().unwrap_or(0);
    (utime as u128 + stime as u128) * 10_000_000
}

fn bench_file_path() -> &'static Path {
    static PATH: OnceLock<PathBuf> = OnceLock::new();
    PATH.get_or_init(|| {
        let path = std::env::temp_dir().join(format!(
            "datum-streaming-io-compare-{}-{}.bin",
            std::process::id(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .expect("clock after epoch")
                .as_nanos()
        ));
        std::fs::write(&path, vec![b'x'; FILE_BYTES]).expect("write streaming io compare file");
        path
    })
    .as_path()
}

fn sync_sink_file_path() -> &'static Path {
    static PATH: OnceLock<PathBuf> = OnceLock::new();
    PATH.get_or_init(|| {
        std::env::temp_dir().join(format!(
            "datum-streaming-io-compare-sync-sink-{}-{}.bin",
            std::process::id(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .expect("clock after epoch")
                .as_nanos()
        ))
    })
    .as_path()
}

fn tokio_sink_file_path() -> &'static Path {
    static PATH: OnceLock<PathBuf> = OnceLock::new();
    PATH.get_or_init(|| {
        std::env::temp_dir().join(format!(
            "datum-streaming-io-compare-tokio-sink-{}-{}.bin",
            std::process::id(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .expect("clock after epoch")
                .as_nanos()
        ))
    })
    .as_path()
}

fn delimiter_chunks() -> Vec<Vec<u8>> {
    let line = b"line-0123456789\n";
    let mut payload = Vec::with_capacity(line.len() * FRAMING_LINES);
    for _ in 0..FRAMING_LINES {
        payload.extend_from_slice(line);
    }
    payload
        .chunks(257)
        .map(|chunk| chunk.to_vec())
        .collect::<Vec<_>>()
}

fn json_chunks() -> Vec<Vec<u8>> {
    let object = br#"{"id":42,"name":"datum","active":true}"#;
    let mut payload = Vec::with_capacity((object.len() + 1) * JSON_OBJECTS + 2);
    payload.push(b'[');
    for index in 0..JSON_OBJECTS {
        if index > 0 {
            payload.push(b',');
        }
        payload.extend_from_slice(object);
    }
    payload.push(b']');
    payload
        .chunks(193)
        .map(|chunk| chunk.to_vec())
        .collect::<Vec<_>>()
}

fn file_chunks() -> Vec<Vec<u8>> {
    vec![vec![b'x'; FILE_CHUNK_SIZE]; FILE_BYTES / FILE_CHUNK_SIZE]
}

fn delimiter_framing_lines_10k() -> u64 {
    Source::from_iter(delimiter_chunks())
        .via(Framing::delimiter(b"\n".to_vec(), 64, false))
        .run_with_materializer(Sink::fold(0_u64, |count, _| count + 1), materializer())
        .unwrap()
        .wait()
        .unwrap()
}

fn json_framing_objects_10k() -> u64 {
    Source::from_iter(json_chunks())
        .via(Framing::json(256))
        .run_with_materializer(Sink::fold(0_u64, |count, _| count + 1), materializer())
        .unwrap()
        .wait()
        .unwrap()
}

fn file_source_256k_chunk8192() -> u64 {
    FileIO::from_path(bench_file_path().to_path_buf(), FILE_CHUNK_SIZE)
        .run_with_materializer(
            Sink::fold(0_u64, |count, chunk: Vec<u8>| count + chunk.len() as u64),
            materializer(),
        )
        .unwrap()
        .wait()
        .unwrap()
}

fn file_sink_256k_chunk8192() -> u64 {
    Source::from_iter(file_chunks())
        .run_with_materializer(
            FileIO::to_path(sync_sink_file_path().to_path_buf()),
            materializer(),
        )
        .unwrap()
        .wait()
        .unwrap();
    FILE_BYTES as u64
}

fn tokio_file_source_256k_chunk8192() -> u64 {
    TokioFileIO::from_path(bench_file_path().to_path_buf(), FILE_CHUNK_SIZE)
        .run_with_materializer(
            Sink::fold(0_u64, |count, chunk: Vec<u8>| count + chunk.len() as u64),
            materializer(),
        )
        .unwrap()
        .wait()
        .unwrap()
}

fn tokio_file_sink_256k_chunk8192() -> u64 {
    let result = Source::from_iter(file_chunks())
        .run_with_materializer(
            TokioFileIO::to_path(tokio_sink_file_path().to_path_buf()),
            materializer(),
        )
        .unwrap()
        .wait()
        .unwrap();
    result.status().unwrap();
    result.bytes()
}

struct EchoServer {
    addr: SocketAddr,
    _completion: datum::StreamCompletion<datum::NotUsed>,
}

fn tcp_echo_addr() -> SocketAddr {
    static SERVER: OnceLock<EchoServer> = OnceLock::new();
    SERVER
        .get_or_init(|| {
            let (binding, completion) = TokioTcp::bind("127.0.0.1:0", FILE_CHUNK_SIZE)
                .to_mat(
                    Sink::foreach(|connection: TcpIncomingConnection| {
                        let (source, sink) = connection.into_parts();
                        let result = source
                            .run_with(sink)
                            .expect("echo stream materializes")
                            .wait()
                            .expect("echo stream completes");
                        result.status().expect("echo stream writes successfully");
                    }),
                    Keep::both,
                )
                .run_with_materializer(materializer())
                .expect("echo server materializes");
            let binding = binding.wait().expect("echo server binds");
            EchoServer {
                addr: binding.local_addr(),
                _completion: completion,
            }
        })
        .addr
}

fn tcp_echo_roundtrip_64b() -> u64 {
    Source::single(vec![b'p'; TCP_PAYLOAD_BYTES])
        .via(TokioTcp::outgoing_connection(
            tcp_echo_addr(),
            FILE_CHUNK_SIZE,
        ))
        .run_with_materializer(Sink::head(), materializer())
        .unwrap()
        .wait()
        .unwrap()
        .len() as u64
}