Crate ballista[−][src]
Expand description
Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.
The foundational technologies in Ballista are:
- Apache Arrow memory model and compute kernels for efficient processing of data.
- Apache Arrow Flight Protocol for efficient data transfer between processes.
- Google Protocol Buffers for serializing query plans.
- Docker for packaging up executors along with user-defined code.
Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.
Starting a cluster
There are numerous ways to start a Ballista cluster, including support for Docker and Kubernetes. For full documentation, refer to the DataFusion User Guide
A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.
cargo install ballista-scheduler cargo install ballista-executor
With these crates installed, it is now possible to start a scheduler process.
RUST_LOG=info ballista-scheduler
The scheduler will bind to port 50050 by default.
Next, start an executor processes in a new terminal session with the specified concurrency level.
RUST_LOG=info ballista-executor -c 4
The executor will bind to port 50051 by default. Additional executors can be started by manually specifying a bind port. For example:
RUST_LOG=info ballista-executor --bind-port 50052 -c 4
Executing a query
Ballista provides a BallistaContext
as a starting point for creating queries. DataFrames can be created
by invoking the read_csv
, read_parquet
, and sql
methods.
The following example runs a simple aggregate SQL query against a CSV file from the New York Taxi and Limousine Commission data set.
use ballista::prelude::*; use datafusion::arrow::util::pretty; use datafusion::prelude::CsvReadOptions; #[tokio::main] async fn main() -> Result<()> { // create configuration let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; // connect to Ballista scheduler let ctx = BallistaContext::remote("localhost", 50050, &config); // register csv file with the execution context ctx.register_csv( "tripdata", "/path/to/yellow_tripdata_2020-01.csv", CsvReadOptions::new(), )?; // execute the query let df = ctx.sql( "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count ORDER BY passenger_count", )?; // collect the results and print them to stdout let results = df.collect().await?; pretty::print_batches(&results)?; Ok(()) }