Skip to main content

Crate ironbeam

Crate ironbeam 

Source
Expand description

§Ironbeam

A data processing framework for Rust inspired by Apache Beam and Google Cloud Dataflow. Ironbeam provides a declarative API for building batch data pipelines with support for transformations, aggregations, joins, and I/O operations.

§Key Features

  • Declarative pipeline API - chain transformations with a fluent interface
  • Stateless and stateful operations - map, filter, flat_map, group_by_key, combine
  • Built-in combiners - Sum, Min, Max, Average, DistinctCount, TopK, and more
  • Join support - inner, left, right, and full outer joins
  • Side inputs - enrich streams with auxiliary data (vectors and hash maps)
  • Batch processing - optimize CPU-heavy operations with batch transforms
  • Sequential and parallel execution - choose the right mode for your workload
  • I/O integrations - JSON Lines, CSV, and Parquet (all optional via feature flags)
  • Type-safe - leverages Rust’s type system for compile-time correctness

§Quick Start

use ironbeam::*;

// Create a pipeline
let p = Pipeline::default();

// Build a word count pipeline
let lines = from_vec(&p, vec![
    "hello world".to_string(),
    "hello rust".to_string(),
]);

let counts = lines
    .flat_map(|line: &String| {
        line.split_whitespace()
            .map(|w| w.to_string())
            .collect::<Vec<_>>()
    })
    .key_by(|word: &String| word.clone())
    .map_values(|_word: &String| 1u64)
    .combine_values(Count);

// Execute and collect results
let results = counts.collect_seq()?;

§Core Concepts

§Pipeline

A Pipeline is the container for your computation graph. Create one with Pipeline::default(), then attach data sources and transformations to it.

§PCollection

A PCollection<T> represents a distributed collection with elements of type T. It’s the fundamental abstraction for data in Ironbeam. Collections are:

  • Immutable - transformations create new collections
  • Lazy - computation happens when you call a collect method
  • Type-safe - generic over the element type

§Transformations

Ironbeam provides two categories of transforms:

§Stateless (element-wise)
  • map - transform each element
  • filter - keep elements matching a predicate
  • flat_map - transform each element into zero or more outputs
  • map_batches - process elements in batches for efficiency
§Stateful (keyed operations)

§Combiners

The combiners module provides reusable aggregation functions:

You can also implement custom combiners via the CombineFn trait.

§Joins

Ironbeam supports all standard join types for PCollection<(K, V)>:

§Side Inputs

Enrich your pipeline with auxiliary data using side inputs:

§Execution Modes

Choose how to execute your pipeline:

Both modes produce the same results; parallel execution is useful for CPU-intensive workloads.

§I/O Operations

Ironbeam supports reading and writing common data formats (all optional via feature flags):

§JSON Lines (feature: io-jsonl)

use ironbeam::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Serialize, Deserialize)]
struct Record { id: u32, name: String }

let p = Pipeline::default();

// Read the entire file into memory
let data = read_jsonl::<Record>(&p, "data.jsonl")?;

// Or stream by chunks
let stream = read_jsonl_streaming::<Record>(&p, "data.jsonl", 1000)?;

// Write results
data.write_jsonl("output.jsonl")?;

§CSV (feature: io-csv)

use ironbeam::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Serialize, Deserialize)]
struct Row { k: String, v: u64 }

let p = Pipeline::default();
let data = read_csv::<Row>(&p, "data.csv", true)?; // true = has headers
data.write_csv("output.csv", true)?;

§Parquet (feature: io-parquet)

use ironbeam::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Serialize, Deserialize)]
struct Record { id: u32, score: f64 }

let p = Pipeline::default();
let data = read_parquet_streaming::<Record>(&p, "data.parquet", 1)?;
data.write_parquet("output.parquet")?;

§Feature Flags

  • io-jsonl - Enable JSON Lines I/O support
  • io-csv - Enable CSV I/O support
  • io-parquet - Enable Parquet I/O support (requires Arrow)
  • parallel-io - Enable parallel I/O operations (write_*_par methods)
  • metrics - Enable metrics collection and reporting (enabled by default)
  • checkpointing - Enable automatic checkpointing for fault tolerance (enabled by default)
  • spilling - Enable automatic memory spilling to disk (enabled by default)

§Examples

§Word Count

use ironbeam::*;

let p = Pipeline::default();
let text = from_vec(&p, vec!["foo bar baz".to_string()]);

let counts = text
    .flat_map(|s: &String| s.split_whitespace().map(String::from).collect::<Vec<_>>())
    .key_by(|w: &String| w.clone())
    .map_values(|_: &String| 1u64)
    .combine_values(Count);

let results = counts.collect_seq()?;

§Group and Aggregate

use ironbeam::*;

let p = Pipeline::default();
let sales = from_vec(&p, vec![
    ("product_a".to_string(), 100u64),
    ("product_b".to_string(), 200u64),
    ("product_a".to_string(), 150u64),
]);

let totals = sales.combine_values(Sum::<u64>::default());
let results = totals.collect_seq()?;

§Join Two Collections

use ironbeam::*;

let p = Pipeline::default();

let users = from_vec(&p, vec![
    (1u32, "Alice".to_string()),
    (2u32, "Bob".to_string()),
]);

let scores = from_vec(&p, vec![
    (1u32, 95u32),
    (2u32, 87u32),
    (3u32, 92u32), // no matching user
]);

let joined = users.join_inner(&scores);
let results = joined.collect_seq()?;

§Using Side Inputs

use ironbeam::*;
use std::collections::HashMap;

let p = Pipeline::default();

let events = from_vec(&p, vec!["user_123".to_string(), "user_456".to_string()]);

let user_names = side_hashmap::<String, String>(vec![
    ("user_123".into(), "Alice".into()),
    ("user_456".into(), "Bob".into()),
]);

let enriched = events.map_with_side_map(&user_names, |user_id, names| {
    let name = names.get(user_id).cloned().unwrap_or_else(|| "Unknown".into());
    format!("{}: {}", user_id, name)
});

let results = enriched.collect_seq()?;

§Tracking Metrics

use ironbeam::*;

let p = Pipeline::default();

// Enable metrics collection
let mut metrics = metrics::MetricsCollector::new();
metrics.register(Box::new(metrics::CounterMetric::with_value("input_records", 1000)));
p.set_metrics(metrics);

// Build and execute the pipeline
let data = from_vec(&p, (0..1000).collect::<Vec<i32>>());
let result = data
    .filter(|x: &i32| x % 2 == 0)
    .map(|x: &i32| x * 2)
    .collect_seq()?;

// Print or save metrics after execution
if let Some(metrics) = p.take_metrics() {
    metrics.print();
    // Or save to a file
    metrics.save_to_file("pipeline_metrics.json")?;
}

§Using Checkpointing for Fault Tolerance

use ironbeam::*;
use ironbeam::checkpoint::{CheckpointConfig, CheckpointPolicy};

let p = Pipeline::default();
let data = from_vec(&p, (0..1_000_000).collect::<Vec<i32>>());

// Configure automatic checkpointing
let checkpoint_config = CheckpointConfig {
    enabled: true,
    directory: "./checkpoints".into(),
    policy: CheckpointPolicy::AfterEveryBarrier,
    auto_recover: true,
    max_checkpoints: Some(5),
};

let runner = Runner {
    mode: ExecMode::Sequential,
    checkpoint_config: Some(checkpoint_config),
    ..Default::default()
};

// Build pipeline - checkpoints will be created automatically
let result_collection = data
    .key_by(|x: &i32| x % 100)
    .map_values(|x: &i32| *x as u64)
    .combine_values(Sum::<u64>::default());

// Pipeline will checkpoint after each barrier and can recover on failure
let result = runner.run_collect::<(i32, u64)>(&p, result_collection.node_id())?;

§Testing Your Pipelines

Ironbeam provides comprehensive testing utilities in the testing module to help you write idiomatic Rust tests for your data pipelines.

§Basic Testing

use ironbeam::*;
use ironbeam::testing::*;
use anyhow::Result;

#[test]
fn test_simple_pipeline() -> Result<()> {
    let p = TestPipeline::new();

    let result = from_vec(&p, vec![1, 2, 3])
        .map(|x: &i32| x * 2)
        .collect_seq()?;

    assert_collections_equal(result, vec![2, 4, 6]);
    Ok(())
}

§Testing with Assertions

The testing module provides specialized assertions for collections:

§Test Data Builders

Create test data fluently with builders:

use ironbeam::testing::*;

let data = TestDataBuilder::<i32>::new()
    .add_range(1..=10)
    .add_value(100)
    .add_repeated(42, 3)
    .build();

let kvs = KVTestDataBuilder::new()
    .add_kv("a", 1)
    .add_key_with_values("b", vec![2, 3, 4])
    .build();

§Debug Utilities

Inspect pipelines during test execution:

use ironbeam::*;
use ironbeam::testing::*;

let p = TestPipeline::new();
let result = from_vec(&p, vec![1, 2, 3])
    .debug_inspect("after source")  // Prints elements to stderr
    .map(|x: &i32| x * 2)
    .debug_count("after map")       // Prints count
    .debug_sample(5, "first 5")     // Prints first 5 elements
    .collect_seq()?;

§Pre-built Fixtures

Use common test datasets for realistic testing:

use ironbeam::testing::*;

let logs = sample_log_entries();       // Web server logs
let words = word_count_data();         // Text data for word counting
let ts = time_series_data();           // Time-series measurements
let users = user_product_interactions(); // Relational data

For more examples, see the testing module documentation and run cargo run --example testing_pipeline.

§Performance Tips

  • Use map_batches for CPU-intensive operations
  • Use streaming I/O for large files that don’t fit in memory
  • Use parallel execution (collect_par) for CPU-bound workloads
  • Combine multiple stateless operations before stateful ones (the planner will fuse them)
  • Use combine_values_lifted after group_by_key for better performance

§Architecture

Ironbeam uses a deferred execution model:

  1. Building a pipeline creates a computation graph (DAG) of transformations
  2. The planner optimizes the graph (fusion, etc.)
  3. The runner executes the optimized plan when you call a collect method
  4. Results are materialized into memory

§Module Overview

  • collection - Core PCollection type and transformation methods
  • combiners - Built-in aggregation functions (Sum, Min, Max, etc.)
  • pipeline - Pipeline construction and management
  • io - I/O operations for JSON Lines, CSV, and Parquet
  • runner - Execution engine (sequential and parallel modes)
  • planner - Query optimization and graph transformations
  • helpers - Convenience functions and side input builders
  • extensions - Extension points for custom transforms and I/O
  • metrics - Metrics collection and reporting (feature: metrics)
  • checkpoint - Automatic checkpointing for fault tolerance (feature: checkpointing)

§Extensibility

Ironbeam provides several extension points for adding custom functionality:

§Custom Transforms

Implement DynOp to create custom stateless transformations:

use ironbeam::*;
use ironbeam::node::DynOp;
use ironbeam::type_token::Partition;
use std::sync::Arc;

struct MyCustomOp;
impl DynOp for MyCustomOp {
    fn apply(&self, input: Partition) -> Partition {
        // Your custom logic here
        input
    }
}

let p = Pipeline::default();
let data = from_vec(&p, vec![1, 2, 3]);
let transformed = data.apply_transform::<i32>(Arc::new(MyCustomOp));

§Custom I/O Sources

Implement VecOps to integrate custom data sources. See from_custom_source for a complete example.

§Composite Transforms

Use CompositeTransform to package reusable pipelines:

use ironbeam::*;
use ironbeam::extensions::CompositeTransform;

struct MyComposite;
impl CompositeTransform<String, String> for MyComposite {
    fn expand(&self, input: PCollection<String>) -> PCollection<String> {
        input.map(|s: &String| s.to_uppercase())
             .filter(|s: &String| !s.is_empty())
    }
}

Re-exports§

pub use collection::CombineFn;
pub use collection::Count;
pub use collection::PCollection;
pub use collection::RFBound;
pub use combiners::AverageF64;
pub use combiners::DistinctCount;
pub use combiners::Max;
pub use combiners::Min;
pub use combiners::Sum;
pub use combiners::TopK;
pub use node_id::NodeId;
pub use pipeline::Pipeline;
pub use planner::CostEstimate;
pub use planner::ExecutionExplanation;
pub use planner::ExplainStep;
pub use planner::OptimizationDecision;
pub use planner::Plan;
pub use planner::build_plan;
pub use runner::ExecMode;
pub use runner::Runner;
pub use type_token::Partition;
pub use utils::OrdF64;
pub use window::TimestampMs;
pub use window::Timestamped;
pub use window::Window;
pub use extensions::CompositeTransform;
pub use node::DynOp;
pub use type_token::TypeTag;
pub use type_token::VecOps;
pub use io::jsonl::read_jsonl_range;io-jsonl
pub use io::jsonl::read_jsonl_vec;io-jsonl
pub use helpers::jsonl::read_jsonl_streaming;io-jsonl
pub use io::jsonl::write_jsonl_par;io-jsonl and parallel-io
pub use io::csv::read_csv_vec;io-csv
pub use io::csv::write_csv;io-csv
pub use io::csv::write_csv_vec;io-csv
pub use io::csv::write_csv_par;io-csv and parallel-io
pub use io::parquet::read_parquet_vec;io-parquet
pub use io::parquet::write_parquet_vec;io-parquet
pub use helpers::csv::read_csv;io-csv
pub use helpers::csv::read_csv_streaming;io-csv
pub use helpers::jsonl::read_jsonl;io-jsonl
pub use helpers::parquet::read_parquet_streaming;io-parquet
pub use helpers::*;

Modules§

checkpointcheckpointing
Automatic checkpointing for fault tolerance in long-running batch jobs.
collection
Core collection types and internal stateless operators.
combiners
Built-in combiners for combine_values and combine_values_lifted.
extensions
Extension points for custom pipeline operations.
helpers
High-level convenience helpers and extension methods for crate::collection::PCollection.
io
Low-level I/O primitives for data ingestion and export.
metricsmetrics
Metrics collection and reporting for pipeline execution.
node
Execution graph “nodes” and the dynamic operator trait.
node_id
Lightweight unique identifier for nodes within a Pipeline.
pipeline
In-memory representation of a dataflow pipeline graph.
planner
Query planner and optimizer passes.
runner
Execution engine.
spillspilling
Automatic resource spilling for memory-constrained pipelines.
spill_integration
Integration of spilling functionality with the runner.
testing
Testing utilities for Ironbeam pipelines.
type_token
Type tags and type-erased vector helpers.
utils
Utility types and functions for Ironbeam.
validation
Data quality and validation utilities for production pipelines.
window
Event-time windowing primitives.

Macros§

partition
Partition a PCollection of enum values into separate collections by variant.