datafusion 48.0.1

DataFusion is an in-memory query engine that uses Apache Arrow as the memory model
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
extern crate arrow;
extern crate datafusion;

mod data_utils;
use crate::criterion::Criterion;
use data_utils::{create_table_provider, make_data};
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::{datasource::MemTable, error::Result};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::TaskContext;

use parking_lot::Mutex;
use std::{sync::Arc, time::Duration};
use tokio::runtime::Runtime;

fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
    let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
    criterion::black_box(rt.block_on(df.collect()).unwrap());
}

fn create_context(
    partitions_len: usize,
    array_len: usize,
    batch_size: usize,
) -> Result<Arc<Mutex<SessionContext>>> {
    let ctx = SessionContext::new();
    let provider = create_table_provider(partitions_len, array_len, batch_size)?;
    ctx.register_table("t", provider)?;
    Ok(Arc::new(Mutex::new(ctx)))
}

fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
    let partitions_len = 10;
    let array_len = 1 << 26; // 64 M
    let batch_size = 8192;
    let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
    let rt = Runtime::new().unwrap();

    let mut group = c.benchmark_group("custom-measurement-time");
    group.measurement_time(Duration::from_secs(40));

    group.bench_function("distinct_group_by_u64_narrow_limit_10", |b| {
        b.iter(|| {
            query(
                ctx.clone(),
                &rt,
                "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10",
            )
        })
    });

    group.bench_function("distinct_group_by_u64_narrow_limit_100", |b| {
        b.iter(|| {
            query(
                ctx.clone(),
                &rt,
                "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 100",
            )
        })
    });

    group.bench_function("distinct_group_by_u64_narrow_limit_1000", |b| {
        b.iter(|| {
            query(
                ctx.clone(),
                &rt,
                "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 1000",
            )
        })
    });

    group.bench_function("distinct_group_by_u64_narrow_limit_10000", |b| {
        b.iter(|| {
            query(
                ctx.clone(),
                &rt,
                "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10000",
            )
        })
    });

    group.bench_function("group_by_multiple_columns_limit_10", |b| {
        b.iter(|| {
            query(
                ctx.clone(),
                &rt,
                "SELECT u64_narrow, u64_wide, utf8, f64 FROM t GROUP BY 1, 2, 3, 4 LIMIT 10",
            )
        })
    });
    group.finish();
}

async fn distinct_with_limit(
    plan: Arc<dyn ExecutionPlan>,
    ctx: Arc<TaskContext>,
) -> Result<()> {
    let batches = collect(plan, ctx).await?;
    assert_eq!(batches.len(), 1);
    let batch = batches.first().unwrap();
    assert_eq!(batch.num_rows(), 10);

    Ok(())
}

fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
    criterion::black_box(rt.block_on(distinct_with_limit(plan.clone(), ctx.clone())))
        .unwrap();
}

pub async fn create_context_sampled_data(
    sql: &str,
    partition_cnt: i32,
    sample_cnt: i32,
) -> Result<(Arc<dyn ExecutionPlan>, Arc<TaskContext>)> {
    let (schema, parts) =
        make_data(partition_cnt, sample_cnt, false /* asc */, false).unwrap();
    let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap());

    // Create the DataFrame
    let cfg = SessionConfig::new();
    let ctx = SessionContext::new_with_config(cfg);
    let _ = ctx.register_table("traces", mem_table)?;
    let df = ctx.sql(sql).await?;
    let physical_plan = df.create_physical_plan().await?;
    Ok((physical_plan, ctx.task_ctx()))
}

fn criterion_benchmark_limited_distinct_sampled(c: &mut Criterion) {
    let rt = Runtime::new().unwrap();
    let limit = 10;
    let partitions = 100;
    let samples = 100_000;
    let sql =
        format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");
    c.bench_function(
        format!("distinct query with {partitions} partitions and {samples} samples per partition with limit {limit}").as_str(),
        |b| b.iter(|| {
            let (plan, ctx) = rt.block_on(
                create_context_sampled_data(sql.as_str(), partitions, samples)
            ).unwrap();
            run(&rt, plan.clone(), ctx.clone())
        }),
    );

    let partitions = 10;
    let samples = 1_000_000;
    let sql =
        format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");
    c.bench_function(
        format!("distinct query with {partitions} partitions and {samples} samples per partition with limit {limit}").as_str(),
        |b| b.iter(|| {
            let (plan, ctx) = rt.block_on(
                create_context_sampled_data(sql.as_str(), partitions, samples)
            ).unwrap();
            run(&rt, plan.clone(), ctx.clone())
        }),
    );

    let partitions = 1;
    let samples = 10_000_000;
    let sql =
        format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");
    c.bench_function(
        format!("distinct query with {partitions} partitions and {samples} samples per partition with limit {limit}").as_str(),
        |b| b.iter(|| {
            let (plan, ctx) = rt.block_on(
                create_context_sampled_data(sql.as_str(), partitions, samples)
            ).unwrap();
            run(&rt, plan.clone(), ctx.clone())
        }),
    );
}

criterion_group!(
    benches,
    criterion_benchmark_limited_distinct,
    criterion_benchmark_limited_distinct_sampled
);
criterion_main!(benches);