Skip to main content

Crate bunsen_firehose

Crate bunsen_firehose 

Source
Expand description

§bunsen-firehose - Burn-based Data Pipeline

bunsen-firehose is a column-oriented data pipeline for feeding burn models. You describe what each column is and how derived columns are computed (a FirehoseTableSchema of ColumnSchema + BuildPlans), then run batches of rows (FirehoseRowBatch) through an executor that applies the registered operators in dependency order.

The moving parts:

§Example: define an operator, plan a column, run a batch

This wires up a one-operator pipeline end to end: a custom Add operator is registered into an environment, a result = x + y + bias column is planned onto a schema, and a two-row batch is executed.

use std::sync::Arc;

use serde::{Deserialize, Serialize};

use bunsen_firehose::core::{
    FirehoseRowBatch, FirehoseRowReader, FirehoseRowTransaction, FirehoseRowWriter,
    FirehoseTableSchema,
    operations::{
        environment::MapOpEnvironment,
        executor::{FirehoseBatchExecutor, SequentialBatchExecutor},
        factory::SimpleConfigOperatorFactory,
        operator::FirehoseOperator,
        planner::OperationPlan,
        signature::{FirehoseOperatorSignature, ParameterSpec},
    },
    schema::ColumnSchema,
};
use bunsen_firehose::define_firehose_operator_id;

// Defines `pub static ADD: &str = "fh:op://<module>::ADD";`.
define_firehose_operator_id!(ADD);

/// Computes `x + y + bias`. The struct *is* the (deserializable) config.
#[derive(Debug, Serialize, Deserialize)]
struct Add {
    bias: i64,
}

impl FirehoseOperator for Add {
    fn apply_to_row(&self, txn: &mut FirehoseRowTransaction) -> anyhow::Result<()> {
        let x = txn.expect_get_parsed::<i64>("x");
        let y = txn.expect_get_parsed::<i64>("y");
        txn.expect_set_serialized("result", x + y + self.bias);
        Ok(())
    }
}

fn main() -> anyhow::Result<()> {
    // An environment maps operator ids to factories that build operators
    // from their (JSON) config.
    let factory: Arc<SimpleConfigOperatorFactory<Add>> =
        Arc::new(SimpleConfigOperatorFactory::new(
            FirehoseOperatorSignature::from_operator_id(ADD)
                .with_description("Adds x + y + bias")
                .with_input(ParameterSpec::new::<i64>("x"))
                .with_input(ParameterSpec::new::<i64>("y"))
                .with_output(ParameterSpec::new::<i64>("result")),
        ));
    let env = Arc::new(MapOpEnvironment::from_operators(vec![factory])?);

    // Start from the base columns, then *plan* the derived `result` column.
    // Planning validates the operator's parameter types against the schema
    // and appends both the new column and its build plan.
    let mut schema = FirehoseTableSchema::from_columns(&[
        ColumnSchema::new::<i64>("x"),
        ColumnSchema::new::<i64>("y"),
    ]);
    OperationPlan::for_operation_id(ADD)
        .with_input("x", "x")
        .with_input("y", "y")
        .with_output("result", "result")
        .with_config(Add { bias: 100 })
        .apply_to_schema(&mut schema, env.as_ref())?;
    let schema = Arc::new(schema);

    // A `SequentialBatchExecutor` runs every build plan in dependency order.
    let executor = SequentialBatchExecutor::new(schema.clone(), env.clone())?;

    // Fill a batch with the base columns; the executor derives `result`.
    let mut batch = FirehoseRowBatch::new_with_size(schema.clone(), 2);
    batch[0].expect_set_serialized("x", 10_i64);
    batch[0].expect_set_serialized("y", 20_i64);
    batch[1].expect_set_serialized("x", -5_i64);
    batch[1].expect_set_serialized("y", 2_i64);

    executor.execute_batch(&mut batch)?;

    assert_eq!(batch[0].expect_get_parsed::<i64>("result"), 130);
    assert_eq!(batch[1].expect_get_parsed::<i64>("result"), 97);
    Ok(())
}

For a full training pipeline — image loading, augmentation, and a burn DataLoaderBuilder driven by a FirehoseExecutorBatcher — see the resnet_tiny example under demos/bimm/examples.

Modules§

burn
Burn Integration Module.
core
New Data Table module.
ops
Namespace of common operators.
utility
Utility

Macros§

define_firehose_operator
Combined macro to define and register a firehose operator.
define_firehose_operator_id
Define a self-referential operator ID.
define_self_referential_id
Define a self-referential ID.
register_firehose_operator_factory
Macro to register a default operator factory.