dbsp 0.302.0

Continuous streaming analytics engine
Documentation
use anyhow::{Context, Result, anyhow};
use crossbeam::channel::{Sender, bounded};
use dbsp::circuit::{CircuitConfig, CircuitStorageConfig};
use dbsp::{
    Runtime,
    mimalloc::MiMalloc,
    operator::{MapHandle, Update},
    utils::{Tup2, Tup5},
};
use feldera_types::config::{StorageCacheConfig, StorageConfig, StorageOptions};
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use std::{
    thread::{self, JoinHandle},
    time::{Duration, Instant},
};
use tempfile::tempdir;

const BATCH_SIZE: usize = 10_000;
const PROGRESS_EVERY_BATCHES: usize = 100;
const WORKERS: usize = 2;
const DATAGEN_THREADS: usize = 8;
const TOTAL_RECORDS: u64 = 70_000_000;
const KEY_SPACE: u64 = 100_000_000;
const SEED: u64 = 0;
const MAX_IN_FLIGHT_BATCHES: usize = 64;

type Value = Tup5<u64, u64, u64, u64, u64>;
type BatchRecord = Tup2<u64, Update<Value, Value>>;
type Batch = Vec<BatchRecord>;

#[global_allocator]
static ALLOC: MiMalloc = MiMalloc;

fn main() -> Result<()> {
    validate_constants()?;
    let temp = tempdir().context("failed to create temp directory for storage backend")?;
    let config = CircuitConfig::with_workers(WORKERS).with_storage(Some(
        CircuitStorageConfig::for_config(
            StorageConfig {
                path: temp.path().to_string_lossy().into_owned(),
                cache: StorageCacheConfig::default(),
            },
            StorageOptions {
                min_storage_bytes: None,
                min_step_storage_bytes: None,
                ..StorageOptions::default()
            },
        )
        .context("failed to configure POSIX storage backend")?,
    ));

    let total_batches = (TOTAL_RECORDS / BATCH_SIZE as u64) as usize;
    println!(
        "Running input_map_ingest benchmark with {} workers, {} datagen threads, {} records ({} batches of {})",
        WORKERS, DATAGEN_THREADS, TOTAL_RECORDS, total_batches, BATCH_SIZE
    );

    let (mut dbsp, mut input_handle) = Runtime::init_circuit(config, |circuit| {
        let (stream, handle): (_, MapHandle<u64, Value, Value>) =
            circuit.add_input_map::<u64, Value, Value, _>(|v, u| *v = *u);

        stream.inspect(|_| {});
        Ok(handle)
    })
    .context("failed to initialize DBSP circuit")?;

    let (sender, receiver) = bounded::<Batch>(MAX_IN_FLIGHT_BATCHES);
    let datagen_handles = spawn_datagen_threads(total_batches, sender);

    let mut total_step_duration = Duration::ZERO;
    let benchmark_start = Instant::now();
    for batch_idx in 0..total_batches {
        let mut batch = receiver
            .recv()
            .map_err(|_| anyhow!("datagen channel closed before sending all batches"))?;
        input_handle.append(&mut batch);

        let step_start = Instant::now();
        dbsp.transaction().context("DBSP transaction/step failed")?;
        total_step_duration += step_start.elapsed();

        let processed_batches = batch_idx + 1;
        if processed_batches % PROGRESS_EVERY_BATCHES == 0 || processed_batches == total_batches {
            let processed_records = processed_batches as u64 * BATCH_SIZE as u64;
            let elapsed = benchmark_start.elapsed().as_secs_f64();
            let percent = (processed_batches as f64 / total_batches as f64) * 100.0;
            println!(
                "progress: {processed_batches}/{total_batches} batches ({processed_records}/{TOTAL_RECORDS} records, {percent:.2}%), wall_clock_secs={elapsed:.3}"
            );
        }
    }

    let profile_path = dbsp.dump_profile("profile").unwrap();
    println!("profile dumped to {profile_path:?}");

    for handle in datagen_handles {
        handle
            .join()
            .map_err(|_| anyhow!("datagen thread panicked"))?
            .context("datagen thread failed")?;
    }

    dbsp.kill()
        .map_err(|_| anyhow!("failed to stop DBSP runtime"))?;

    let wall_clock = benchmark_start.elapsed();
    let step_seconds = total_step_duration.as_secs_f64();
    let throughput = TOTAL_RECORDS as f64 / step_seconds;

    println!("total_step_duration_secs={step_seconds}");
    println!("throughput_records_per_sec={throughput}");
    println!("wall_clock_secs={}", wall_clock.as_secs_f64());

    Ok(())
}

fn validate_constants() -> Result<()> {
    if WORKERS == 0 {
        return Err(anyhow!("WORKERS must be > 0"));
    }
    if DATAGEN_THREADS == 0 {
        return Err(anyhow!("DATAGEN_THREADS must be > 0"));
    }
    if KEY_SPACE == 0 {
        return Err(anyhow!("KEY_SPACE must be > 0"));
    }
    if TOTAL_RECORDS == 0 || !TOTAL_RECORDS.is_multiple_of(BATCH_SIZE as u64) {
        return Err(anyhow!(
            "TOTAL_RECORDS must be > 0 and divisible by {BATCH_SIZE}"
        ));
    }
    if MAX_IN_FLIGHT_BATCHES == 0 {
        return Err(anyhow!("MAX_IN_FLIGHT_BATCHES must be > 0"));
    }
    Ok(())
}

fn spawn_datagen_threads(
    total_batches: usize,
    sender: Sender<Batch>,
) -> Vec<JoinHandle<Result<()>>> {
    let mut handles = Vec::with_capacity(DATAGEN_THREADS);
    let batches_per_thread = total_batches / DATAGEN_THREADS;
    let extra_batches = total_batches % DATAGEN_THREADS;

    for thread_id in 0..DATAGEN_THREADS {
        let thread_sender = sender.clone();
        let batches_for_thread = batches_per_thread + usize::from(thread_id < extra_batches);
        let thread_seed = SEED.wrapping_add((thread_id as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15));

        handles.push(thread::spawn(move || {
            datagen_thread(thread_sender, batches_for_thread, KEY_SPACE, thread_seed)
        }));
    }

    drop(sender);
    handles
}

fn datagen_thread(sender: Sender<Batch>, batches: usize, key_space: u64, seed: u64) -> Result<()> {
    let mut rng = ChaCha8Rng::seed_from_u64(seed);

    for _ in 0..batches {
        let mut batch = Vec::with_capacity(BATCH_SIZE);
        for _ in 0..BATCH_SIZE {
            let key = rng.gen_range(0..key_space);
            let value = Tup5(
                rng.next_u64(),
                rng.next_u64(),
                rng.next_u64(),
                rng.next_u64(),
                rng.next_u64(),
            );
            batch.push(Tup2(key, Update::Insert(value)));
        }

        sender
            .send(batch)
            .map_err(|_| anyhow!("receiver dropped while datagen was sending batches"))?;
    }

    Ok(())
}