exoware-sql 2026.4.1

SQL engine backed by the Exoware API.
Documentation
use std::alloc::{GlobalAlloc, Layout, System};
use std::collections::BTreeMap;
use std::ops::Bound::{Included, Unbounded};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;

use axum::Router;
use bytes::Bytes;
use connectrpc::{Chain, ConnectRpcService, Context};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::arrow::datatypes::DataType;
use datafusion::prelude::SessionContext;
use exoware_proto::connect_compression_registry;
use exoware_proto::store::common::v1::KvEntry as ProtoKvEntry;
use exoware_proto::store::ingest::v1::{
    PutResponse as ProtoPutResponse, Service as IngestService, ServiceServer as IngestServiceServer,
};
use exoware_proto::store::query::v1::{
    GetResponse as ProtoGetResponse, RangeFrame as ProtoRangeFrame,
    ReduceResponse as ProtoReduceResponse, Service as QueryService,
    ServiceServer as QueryServiceServer,
};
use exoware_sdk as exoware_proto;
use exoware_sdk::keys::Key;
use exoware_sdk::StoreClient;
use exoware_sql::{CellValue, IndexSpec, KvSchema, TableColumnConfig};
use futures::stream;
use std::pin::Pin;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;

static ALLOC_CALLS: AtomicU64 = AtomicU64::new(0);
static ALLOC_BYTES: AtomicU64 = AtomicU64::new(0);

struct CountingAllocator;

#[global_allocator]
static GLOBAL_ALLOCATOR: CountingAllocator = CountingAllocator;

unsafe impl GlobalAlloc for CountingAllocator {
    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
        ALLOC_CALLS.fetch_add(1, Ordering::Relaxed);
        ALLOC_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        unsafe { System.alloc(layout) }
    }

    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
        ALLOC_CALLS.fetch_add(1, Ordering::Relaxed);
        ALLOC_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
        unsafe { System.alloc_zeroed(layout) }
    }

    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
        unsafe { System.dealloc(ptr, layout) }
    }

    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
        ALLOC_CALLS.fetch_add(1, Ordering::Relaxed);
        ALLOC_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
        unsafe { System.realloc(ptr, layout, new_size) }
    }
}

#[derive(Debug, Clone, Copy)]
struct AllocStats {
    calls: u64,
    bytes: u64,
}

fn allocation_profile<T>(f: impl FnOnce() -> T) -> (T, AllocStats) {
    ALLOC_CALLS.store(0, Ordering::Relaxed);
    ALLOC_BYTES.store(0, Ordering::Relaxed);
    let out = f();
    let stats = AllocStats {
        calls: ALLOC_CALLS.load(Ordering::Relaxed),
        bytes: ALLOC_BYTES.load(Ordering::Relaxed),
    };
    (out, stats)
}

#[derive(Clone)]
struct MockState {
    kv: Arc<Mutex<BTreeMap<Key, Bytes>>>,
    sequence_number: Arc<AtomicU64>,
}

#[derive(Clone)]
struct BenchIngest {
    state: MockState,
}

impl IngestService for BenchIngest {
    async fn put(
        &self,
        ctx: Context,
        request: buffa::view::OwnedView<exoware_proto::store::ingest::v1::PutRequestView<'static>>,
    ) -> Result<(ProtoPutResponse, Context), connectrpc::ConnectError> {
        let mut parsed = Vec::<(Key, Bytes)>::new();
        for kv in request.kvs.iter() {
            parsed.push((kv.key.to_vec().into(), Bytes::copy_from_slice(kv.value)));
        }
        let mut guard = self.state.kv.lock().expect("kv mutex poisoned");
        for (key, value) in parsed.iter() {
            guard.insert(key.clone(), value.clone());
        }
        let seq = self
            .state
            .sequence_number
            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
            + 1;
        Ok((
            ProtoPutResponse {
                sequence_number: seq,
                ..Default::default()
            },
            ctx,
        ))
    }
}

#[derive(Clone)]
struct BenchQuery {
    state: MockState,
}

impl QueryService for BenchQuery {
    async fn get(
        &self,
        _ctx: Context,
        _request: buffa::view::OwnedView<exoware_proto::store::query::v1::GetRequestView<'static>>,
    ) -> Result<(ProtoGetResponse, Context), connectrpc::ConnectError> {
        Err(connectrpc::ConnectError::unimplemented("bench"))
    }

    async fn get_many(
        &self,
        _ctx: Context,
        _request: buffa::view::OwnedView<
            exoware_proto::store::query::v1::GetManyRequestView<'static>,
        >,
    ) -> Result<
        (
            Pin<
                Box<
                    dyn futures::Stream<
                            Item = Result<
                                exoware_proto::store::query::v1::GetManyFrame,
                                connectrpc::ConnectError,
                            >,
                        > + Send,
                >,
            >,
            Context,
        ),
        connectrpc::ConnectError,
    > {
        Err(connectrpc::ConnectError::unimplemented("bench"))
    }

    async fn range(
        &self,
        _ctx: Context,
        request: buffa::view::OwnedView<exoware_proto::store::query::v1::RangeRequestView<'static>>,
    ) -> Result<
        (
            Pin<
                Box<
                    dyn futures::Stream<Item = Result<ProtoRangeFrame, connectrpc::ConnectError>>
                        + Send,
                >,
            >,
            Context,
        ),
        connectrpc::ConnectError,
    > {
        let start_key = Key::from(request.start.to_vec());
        let end_key = Key::from(request.end.to_vec());
        let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
        let batch_size = usize::try_from(request.batch_size).unwrap_or(usize::MAX);
        let batch = batch_size.max(1);

        let guard = self.state.kv.lock().expect("kv mutex poisoned");
        let mut results: Vec<ProtoKvEntry> = Vec::new();
        let range: (std::ops::Bound<&Key>, std::ops::Bound<&Key>) = (
            Included(&start_key),
            if end_key.is_empty() {
                Unbounded
            } else {
                Included(&end_key)
            },
        );
        for (key, value) in guard.range::<Key, _>(range).take(limit) {
            results.push(ProtoKvEntry {
                key: key.to_vec(),
                value: value.to_vec(),
                ..Default::default()
            });
        }
        drop(guard);
        let token = self
            .state
            .sequence_number
            .load(std::sync::atomic::Ordering::Relaxed);
        let mut frames: Vec<Result<ProtoRangeFrame, connectrpc::ConnectError>> = Vec::new();
        for chunk in results.chunks(batch) {
            frames.push(Ok(ProtoRangeFrame {
                results: chunk.to_vec(),
                ..Default::default()
            }));
        }
        let detail = exoware_proto::store::query::v1::Detail {
            sequence_number: token,
            read_stats: Default::default(),
            ..Default::default()
        };
        Ok((
            Box::pin(stream::iter(frames)),
            exoware_proto::with_query_detail_trailer(Context::default(), &detail),
        ))
    }

    async fn reduce(
        &self,
        _ctx: Context,
        _request: buffa::view::OwnedView<
            exoware_proto::store::query::v1::ReduceRequestView<'static>,
        >,
    ) -> Result<(ProtoReduceResponse, Context), connectrpc::ConnectError> {
        Err(connectrpc::ConnectError::unimplemented("bench"))
    }
}

async fn run_query_once(ctx: Arc<SessionContext>) -> usize {
    let frame = ctx
        .sql(
            "SELECT id, amount_cents \
             FROM orders \
             WHERE status = 'open' AND amount_cents >= 0 \
             LIMIT 1000",
        )
        .await
        .expect("query should compile");
    let batches = frame.collect().await.expect("query should execute");
    batches.iter().map(|b| b.num_rows()).sum()
}

fn build_dataset(schema: &KvSchema, runtime: &Runtime) {
    runtime.block_on(async {
        let mut writer = schema.batch_writer();
        for i in 0..6_000i64 {
            let status = if i % 2 == 0 { "open" } else { "closed" };
            writer
                .insert(
                    "orders",
                    vec![
                        CellValue::Int64(i),
                        CellValue::Utf8(status.to_string()),
                        CellValue::Int64(i * 10),
                    ],
                )
                .expect("row encode should succeed");
        }
        writer.flush().await.expect("seed ingest should succeed");
    });
}

fn bench_exoware_sql_end_to_end_index_scan(c: &mut Criterion) {
    let runtime = Runtime::new().expect("runtime");
    let state = MockState {
        kv: Arc::new(Mutex::new(BTreeMap::new())),
        sequence_number: Arc::new(AtomicU64::new(0)),
    };
    let connect = ConnectRpcService::new(Chain(
        IngestServiceServer::new(BenchIngest {
            state: state.clone(),
        }),
        QueryServiceServer::new(BenchQuery { state }),
    ))
    .with_compression(connect_compression_registry());
    let app = Router::new().fallback_service(connect);

    let listener = runtime
        .block_on(tokio::net::TcpListener::bind("127.0.0.1:0"))
        .expect("bind mock server");
    let addr = listener.local_addr().expect("local addr");
    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
    runtime.spawn(async move {
        axum::serve(listener, app)
            .with_graceful_shutdown(async move {
                let _ = shutdown_rx.await;
            })
            .await
            .expect("mock server should run");
    });

    let base_url = format!("http://{addr}");
    let client = StoreClient::new(&base_url);
    let schema = KvSchema::new(client.clone())
        .table(
            "orders",
            vec![
                TableColumnConfig::new("id", DataType::Int64, false),
                TableColumnConfig::new("status", DataType::Utf8, false),
                TableColumnConfig::new("amount_cents", DataType::Int64, false),
            ],
            vec!["id".to_string()],
            vec![
                IndexSpec::lexicographic("status_idx", vec!["status".to_string()])
                    .expect("valid index")
                    .with_cover_columns(vec!["amount_cents".to_string()]),
            ],
        )
        .expect("valid schema");

    build_dataset(&schema, &runtime);

    let ctx = SessionContext::new();
    schema.register_all(&ctx).expect("register table");
    let ctx = Arc::new(ctx);

    let warmup_rows = runtime.block_on(run_query_once(ctx.clone()));
    assert_eq!(warmup_rows, 1_000);

    let query_start = Instant::now();
    let (_rows, allocs) = allocation_profile(|| runtime.block_on(run_query_once(ctx.clone())));
    let query_elapsed = query_start.elapsed();
    println!(
        "exoware-sql end-to-end allocs: calls={} bytes={}",
        allocs.calls, allocs.bytes
    );
    println!(
        "exoware-sql query elapsed={:.3}ms",
        query_elapsed.as_secs_f64() * 1000.0,
    );

    c.bench_function("exoware_sql_end_to_end_index_scan", |b| {
        b.iter(|| {
            let rows = runtime.block_on(run_query_once(ctx.clone()));
            assert_eq!(rows, 1_000);
        });
    });

    let _ = shutdown_tx.send(());
}

fn read_path_perf(c: &mut Criterion) {
    bench_exoware_sql_end_to_end_index_scan(c);
}

criterion_group!(benches, read_path_perf);
criterion_main!(benches);