Skip to main content

Module tutorial

Module tutorial 

Source
Expand description

§Developer tutorial

This tutorial aims to be a start at teaching Rust developers how to use DBSP in their projects.

All of the programs in this tutorial are provided as examples under examples/tutorial. You can run each of them with, e.g. cargo run --example tutorial1.

§Table of contents

§Introduction

Computation in DBSP is a two-stage process. First, create a DBSP “circuit”, which defines the computation’s structure, including its inputs and outputs1. Second, any number of times, feed in input changes, tell DBSP to run the circuits, and then read out the output changes. A skeleton for a DBSP program might look like this (the second and later steps could iterate any number of times):

fn main() {
    // ...build circuit...
    // ...feed data into circuit...
    // ...execute circuit...
    // ...read output from circuit...
}

The following section shows the basics of how to fill in each of these steps.

§Basics

This section shows off the basics of input, computation, and output. Afterward, we’ll show how to do more sophisticated computation.

§Input

To process data in DBSP, we need to get data from somewhere. The dbsp_adapters crate in crates/adapters implements input and output adapters for a number of formats and transports along with a server that instantiates a DBSP pipeline and adapters based on a user-provided declarative configuration. In this tutorial we take a different approach, instantiating the pipeline and pushing data to it directly using the Rust API. Specifically, we will parse some data from a CSV file and bring it into a circuit.

Let’s work with the Our World in Data public-domain dataset on COVID-19 vaccinations, which is available on Github. Its main data file on vaccinations is vaccinations.csv, which contains about 168,000 rows of data. That’s a lot to stick in the DBSP repo, so we’ve included a subset with data for just a few countries. The full version of the snapshot of the data excerpted here is freely available on Github.

The vaccination data has 16 columns per row. We will only look at three of those: location, a country name; date, a date in the form yyyy-mm-dd; and daily_vaccinations, the number of vaccinations given on date in location. The latter field is sometimes blank.

Rust crates have good support for reading this data. We can combine the csv crate to read CSV files with serde for deserializing into a struct and time for parsing the date field. A full program for parsing and printing the data is below and in tutorial1.rs:

use anyhow::Result;
use csv::Reader;
use serde::Deserialize;
use time::Date;

#[allow(dead_code)]
#[derive(Debug, Deserialize)]
struct Record {
    location: String,
    date: Date,
    daily_vaccinations: Option<u64>,
}

fn main() -> Result<()> {
    let path = format!(
        "{}/examples/tutorial/vaccinations.csv",
        env!("CARGO_MANIFEST_DIR")
    );
    for result in Reader::from_path(path)?.deserialize() {
        let record: Record = result?;
        println!("{:?}", record);
    }
    Ok(())
}

If we run this, then it prints the records in Debug format. Here are the first few:

Record { location: "England", date: 2021-01-10, daily_vaccinations: None }
Record { location: "England", date: 2021-01-11, daily_vaccinations: Some(140441) }
Record { location: "England", date: 2021-01-12, daily_vaccinations: Some(164043) }
Record { location: "England", date: 2021-01-13, daily_vaccinations: Some(192088) }
Record { location: "England", date: 2021-01-14, daily_vaccinations: Some(213978) }
...

We want to create a DBSP circuit and bring this data into it. We create a circuit with RootCircuit::build, which creates an empty circuit, calls a callback that we pass it to add input and computation and output to the circuit, and then fixes the form of the circuit and returns the circuit plus anything we returned from our callback. The code skeleton is like this:

fn build_circuit(circuit: &mut RootCircuit) -> Result<()> {
    // ...populate `circuit` with operators...
    Ok((/*handles*/))
}

fn main() -> Result<()> {
    // Build circuit.
    let (circuit, (/*handles*/)) = RootCircuit::build(build_circuit)?;

    // ...feed data into circuit...
    // ...execute circuit...
    // ...read output from circuit...
}

The natural way to bring our data into the circuit is through a “Z-set” (ZSet) input stream. A “Z-set” is a set in which each item is associated with an integer weight. In the context of changes to a data set, positive weights represent insertions and negative weights represent deletions. The magnitude of the weight represents a count, so that a weight of 1 represents an insertion of a single copy of a record, 2 represents two copies, and so on, and similarly for negative weights and deletions. Thus, a Z-set represents changes to a multiset.

We create the Z-set input stream inside build_circuit using RootCircuit::add_input_zset, which returns a Stream for further use in build_circuit and a ZSetHandle for main to use to feed in data. Our skeleton fills in as shown below. We’re jumping the gun a bit by adding a call to inspect on the Stream. This method calls a closure on each batch of data that passes through; we’re having it print the total weight in our Z-set just to demonstrate that something is happening:

fn build_circuit(circuit: &mut RootCircuit) -> Result<ZSetHandle<Record>> {
    let (input_stream, input_handle) = circuit.add_input_zset::<Record>();
    input_stream.inspect(|records| {
        println!("{}", records.weighted_count());
    });
    // ...populate `circuit` with more operators...
    Ok(input_handle)
}
fn main() -> Result<()> {
    // Build circuit.
    let (circuit, input_handle) = RootCircuit::build(build_circuit)?;

    // ...feed data into circuit...
    // ...execute circuit...
    // ...read output from circuit...
}

The best way to feed the records into input_handle is to collect them into a Vec<(Record, ZWeight)>, where ZWeight (an alias for i64) is the Z-set weight. All the weights can be 1, since we are inserting each of them. We feed them in with ZSetHandle::append. So, we can fill in // ...feed data into circuit... with:

     // Feed data into circuit.
   let path = format!(
       "{}/examples/tutorial/vaccinations.csv",
       env!("CARGO_MANIFEST_DIR")
   );
   let mut input_records = Reader::from_path(path)?
       .deserialize()
       .map(|result| result.map(|record| Tup2(record, 1)))
       .collect::<Result<Vec<Tup2<Record, ZWeight>>, _>>()?;
   input_handle.append(&mut input_records);

}

💡 The code above uses Tup2<Record, ZWeight> where (Record, ZWeight) would be the obvious type. DBSP has its own tuple-like types Tup0, Tup1, …, Tup10 because Rust does not allow DBSP to implement foreign traits on the standard tuple types.

The compiler will point out a problem: Record lacks several traits required for the record type of the “Z-sets”. We need SizeOf from the size_of crate and Archive, Serialize, and Deserialize from the rkyv crate. We can derive all of them:

use rkyv::{Archive, Serialize};
use size_of::SizeOf;
use chrono::NaiveDate;
use dbsp::utils;

#[derive(
    Clone,
    Default,
    Debug,
    Eq,
    PartialEq,
    Ord,
    PartialOrd,
    Hash,
    SizeOf,
    Archive,
    Serialize,
    rkyv::Deserialize,
    serde::Deserialize,
    feldera_macros::IsNone,
)]
#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]

💡 There are two Deserialize traits above. DBSP requires rkyv::Deserialize to support distributed computations, by allowing data to be moved from one host to another. Our example uses serde::Deserialize to parse CSV.

§Execution

Our program now builds a circuit and feeds data into it. To execute it, we just replace // ...execute circuit... with a call to CircuitHandle::transaction:

     // Execute circuit.
     circuit.transaction()?;

Now, if you run our program, with cargo run --example tutorial2, it prints 3961, the number of records in vaccinations.csv. That’s because our program reads an entire CSV file and feeds it as input in a single step. That means that running for more steps wouldn’t make a difference. That’s not a normal use case for DBSP but, arguably, it’s a reasonable setup for a tutorial.

§Computation and output

We haven’t done any computation inside the circuit, nor have we brought output back out of the circuit yet. Let’s add both of those to our skeleton.

Let’s do just enough computation to demonstrate the concept. Suppose we want to pick out a subset of the records. We can use Stream::filter to do that. For example, we can take just the records for locations in the United Kingdom:

    let subset = input_stream.filter(|r| {
        r.location == "England"
            || r.location == "Northern Ireland"
            || r.location == "Scotland"
            || r.location == "Wales"
    });

We could call inspect again to print the results. Instead, let’s bring the results out of the computation into main and print them there. That’s just a matter of calling Stream::output, which returns OutputHandle to return to main, which can then read out the data after each step. Our build_circuit then looks like this:

fn build_circuit(
    circuit: &mut RootCircuit,
) -> Result<(ZSetHandle<Record>, OutputHandle<OrdZSet<Record>>)> {
    let (input_stream, input_handle) = circuit.add_input_zset::<Record>();
    input_stream.inspect(|records| {
        println!("{}", records.weighted_count());
    });
    let subset = input_stream.filter(|r| {
        r.location == "England"
            || r.location == "Northern Ireland"
            || r.location == "Scotland"
            || r.location == "Wales"
    });
    Ok((input_handle, subset.output()))
}

Back in main, we need to update the call to RootCircuit::build so that we save the new output_handle. Then, after we feed in input and execute the circuit, we can read the output. For general kinds of output, it can be a little tricky using OutputHandle, because it supports multithreaded DBSP runtimes that produce one output per thread. For Z-set output, one can just call its consolidate method, which internally merges the multiple outputs if multiple threads are in use. To print the number of records, we can just do the following:

    // Build circuit.
    let (circuit, (input_handle, output_handle)) = RootCircuit::build(build_circuit)?;
    // ...unchanged code to feed data into circuit and execute circuit...

    // Read output from circuit.
    println!("{}", output_handle.consolidate().weighted_count());

Now, if we run it, it prints 3961, as before, followed by 3083. The latter is from the println! in main and shows that we did select a subset of the 3,961 total records.

The full program is in tutorial3.rs.

§More sophisticated computation

Our program only does trivial computation, but DBSP supports much more sophistication. Let’s look at some of what it can do.

§Aggregation

3,083 records is a lot. There’s so much because we’ve got years of daily data. Let’s aggregate daily vaccinations into months, to get monthly vaccinations. DBSP has several forms of aggregation. All of them work with “indexed Z-sets” (IndexedZSet), which are Z-sets of key-value pairs, that is, they associate key-value pairs with weights. Aggregation happens across records with the same key.

We will do the equivalent of the following SQL:

SELECT SUM(daily_vaccinations) FROM vaccinations GROUP BY location, year, month.

where year and month are derived from date.

To aggregate daily vaccinations over months by location, we need to transform our Z-set into an indexed Z-set where the key (the index) has the form (location, year, month) and the value is daily vaccinations (we could keep the whole record but we’d just throw away most of it later). To do this, we call Stream::map_index, passing in a function that maps a record into a key-value tuple:

    let monthly_totals = subset
        .map_index(|r| {
            (
                Tup3(r.location.clone(), r.date.year(), r.date.month() as u8),
                r.daily_vaccinations.unwrap_or(0),
            )
        })

We need to clone the location because it is a String that the records incorporate by value.

Then we can call Stream::aggregate_linear, the simplest form of aggregation in DBSP, to sum across months. This function sums the output of a function. To get monthly vaccinations, we just sum the values from our indexed Z-set (we have to convert to ZWeight because aggregation implicitly multiplies by record weights):

        .aggregate_linear(|v| *v as ZWeight);

We output the indexed Z-set as before, and then in main print it record by record:

fn build_circuit(
    circuit: &mut RootCircuit,
) -> Result<(
    ZSetHandle<Record>,
    OutputHandle<OrdIndexedZSet<Tup3<String, i32, u8>, ZWeight>>,
)> {
    // ...
    Ok((input_handle, monthly_totals.output()))
  }

  fn main() -> Result<()> {
    // ...
    output_handle
        .consolidate()
        .iter()
        .for_each(|(Tup3(l, y, m), sum, w)| println!("{l:16} {y}-{m:02} {sum:10}: {w:+}"));

    Ok(())
}

The output looks like the following. The +1s are the Z-set weights. They show that each record represents an insertion of a single row:

England          2021-01    5600174: +1
England          2021-02    9377418: +1
England          2021-03   11861175: +1
England          2021-04   11288945: +1
England          2021-05   13772946: +1
England          2021-06   10944915: +1
...
Northern Ireland 2021-01     150315: +1
Northern Ireland 2021-02     317074: +1
...
Wales            2023-01      33838: +1
Wales            2023-02      17098: +1
Wales            2023-03       8776: +1

The full program is in tutorial4.rs.

§Rolling aggregation

By using a “moving average” to average recent data, we can obtain a dataset with less noise due to variation from month to month. DBSP provides Stream::partitioned_rolling_average for this purpose. To use it, we have to index our Z-set by time. DBSP uses the time component, which must have an unsigned integer type, to define the window:

    let moving_averages = monthly_totals
        .map_index(|(Tup3(l, y, m), v)| (*y as u32 * 12 + (*m as u32 - 1), Tup2(l.clone(), *v)))

Once we’ve done that, computing the moving average is easy. Here’s how we get the average of the current month and the two preceding months (when they’re in the data set):

        .partitioned_rolling_average(
            |Tup2(l, v)| (l.clone(), *v),
            RelRange::new(RelOffset::Before(2), RelOffset::Before(0)))

As the name of the function suggests, partitioned_rolling_average computes a rolling average within a partition. In this case, we partition the data by country. The first argument of the function is a closure that splits the value component of the input indexed Z-set into a partition key and a value. partitioned_rolling_average returns a partitioned indexed Z-set (OrdPartitionedIndexedZSet). This is just an indexed Z-set in which the key is the “partition” within which averaging occurs (for us, this is the country), and the value is a tuple of a “timestamp” and a value. Note that the value type has an Option wrapped around it. In our case, for example, the input value type was i64, so the output value type is Option<i64>. The output for a given row is None if there are no rows in the window, which can only happen if the range passed in does not include the 0 relative offset (i.e. the current row). Ours does include 0, so None will never occur in our output.

Let’s re-map to recover year and month from the timestamp that we made and to strip off the Option:

        .map_index(|(l, Tup2(date, avg))| (Tup3(l.clone(), date / 12, date % 12 + 1), avg.unwrap()));

If we adjust the build_circuit return type and return value, like shown below, the existing code in main will print it just fine.

fn build_circuit(
    circuit: &mut RootCircuit,
) -> Result<(
    ZSetHandle<Record>,
    OutputHandle<OrdIndexedZSet<Tup3<String, u32, u32>, ZWeight>>,
)> {
    // ...
    Ok((input_handle, moving_averages.output()))
}

The output looks like this (you can verify that the second row is the average of the first two rows in the previous output, and so on):

England          2021-01    5600174: +1
England          2021-02    7488796: +1
England          2021-03    8946255: +1
England          2021-04   10842512: +1
England          2021-05   12307688: +1
England          2021-06   12002268: +1
...
Northern Ireland 2021-01     150315: +1
Northern Ireland 2021-02     233694: +1
...
Wales            2021-01     295057: +1
Wales            2021-02     458273: +1
Wales            2021-03     584463: +1

The whole program is in tutorial5.rs.

§Joins

Suppose we want both the current month’s vaccination count and the moving average together. With enough work, we could get them with just aggregation by writing our own “aggregator” (Aggregator). It’s a little easier to do a join, and it gives us a chance to show how to do that. Both our monthly vaccination counts and our moving averages are indexed Z-sets with the same key type.

The first step is to take the code we’ve written so far and change the final map_index on moving_averages to include a couple of casts, so that monthly_totals and moving_averages have exactly the same key type (both (String, i32, u8)). The new call looks like this; only the as <type> parts are new:

        .map_index(|(l, Tup2(date, avg))| {
            (
                Tup3(l.clone(), (date / 12) as i32, (date % 12 + 1) as u8),
                avg.unwrap(),
            )
        });

Then we can use join_index to do the join and inspect to print the results, as shown below. Besides the streams to join, join_index take a closure, which it calls for every pair of records with equal keys in the streams, passing in the common key and each stream’s value. The closure must return an iterable collection of output key-value pair tuples. By returning a collection, the join can output any number of output records per input pairing.

We want to output a single record per input pair. The Rust standard library Option type’s Some variant is an iterable collection that has exactly one value, so it’s convenient for this purpose:

    let joined = monthly_totals.join_index(&moving_averages, |Tup3(l, y, m), cur, avg| {
        Some((Tup3(l.clone(), *y, *m), Tup2(*cur, *avg)))
    });

We need to adjust the build_circuit return type and value and make main print the new kind of output:

fn build_circuit(
    circuit: &mut RootCircuit,
) -> Result<(
    ZSetHandle<Record>,
    OutputHandle<OrdIndexedZSet<Tup3<String, i32, u8>, Tup2<i64, i64>>>,
)> {
    let (input_stream, input_handle) = circuit.add_input_zset::<Record>();
    Ok((input_handle, joined.output()))
}

fn main() -> Result<()> {
    let (circuit, (input_handle, output_handle)) = RootCircuit::build(build_circuit)?;
    output_handle
        .consolidate()
        .iter()
        .for_each(|(Tup3(l, y, m), Tup2(cur, avg), w)| {
            println!("{l:16} {y}-{m:02} {cur:10} {avg:10}: {w:+}")
        });
    Ok(())
}

The whole program is in tutorial6.rs. If we run it, it prints both per-month vaccination numbers and 3-month moving averages:

England          2021-01    5600174    5600174: +1
England          2021-02    9377418    7488796: +1
England          2021-03   11861175    8946255: +1
England          2021-04   11288945   10842512: +1
England          2021-05   13772946   12307688: +1
England          2021-06   10944915   12002268: +1
...

§Finding months with the most vaccinations

Suppose we want to find the months when the most vaccinations occurred in each country. Stream has a ready-made method Stream::topk_desc for this, which we simply pass the number of top records to keep per group. (If we only want the greatest value, rather than the top-k for k > 1, then Stream::aggregate with the Max aggregator also works.)

There is one tricky part. To use topk_desc, we must re-index our data so that the country is the key (used for grouping) and the number of vaccinations is the value. But, if we do that in the most obvious way, we end up with just the number of vaccinations as the result, whereas we probably want to know the year and month that that number of occurred as well.

One way to recover the year and month is to join against the original data. We can do it without another join by defining a type that is ordered by vaccination count but also contains the year and month. Taking advantage of how Rust derives Ord lexicographically, that’s as simple as:


#[derive(
    Clone,
    Default,
    Debug,
    Eq,
    PartialEq,
    Ord,
    PartialOrd,
    Hash,
    SizeOf,
    Archive,
    Serialize,
    rkyv::Deserialize,
    serde::Deserialize,
    feldera_macros::IsNone,
)]
#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
struct VaxMonthly {
    count: u64,
    year: i32,
    month: u8,
}

We can transform our monthly totals from a (country, year, month) key and vaccinations value in country value and VaxMonthly value with a call to map_index, then just call topk_desc:

    let most_vax = monthly_totals
        .map_index(|(Tup3(l, y, m), sum)| {
            (
                l.clone(),
                VaxMonthly {
                    count: *sum as u64,
                    year: *y,
                    month: *m,
                },
            )
        })
        .topk_desc(3);

Then we just adjust build_circuit return type and value and print the new output type in main:

fn build_circuit(
    circuit: &mut RootCircuit,
) -> Result<(
    ZSetHandle<Record>,
    OutputHandle<OrdIndexedZSet<String, VaxMonthly>>,
)> {
    // ...
    Ok((input_handle, most_vax.output()))
}

fn main() -> Result<()> {
    // ...
    output_handle
        .consolidate()
        .iter()
        .for_each(|(l, VaxMonthly { count, year, month }, w)| {
            println!("{l:16} {year}-{month:02} {count:10}: {w:+}")
        });
    Ok(())
}

The complete program is in tutorial7.rs. When we run it, it outputs the following. The output is sorted in increasing order, which might be a bit surprising, but that’s because DBSP Z-sets always iterate in that order. If it’s important to produce output in another order, one could define custom Ord and PartialOrd on VaxMonthly:

England          2021-03   11861175: +1
England          2021-05   13772946: +1
England          2021-12   14801300: +1
Northern Ireland 2021-05     394047: +1
Northern Ireland 2021-04     436870: +1
Northern Ireland 2021-12     489059: +1
Scotland         2021-06    1244155: +1
Scotland         2021-05    1272194: +1
Scotland         2021-12    1388549: +1
Wales            2021-04     707714: +1
Wales            2021-12     822945: +1
Wales            2021-03     836844: +1

§Vaccination rates

Suppose we want to compare the population in each country with the number of vaccinations given, that is, to calculate a vaccination rate. Our input data contains a total_vaccinations_per_hundred column that reports this information, but let’s calculate it ourselves to demonstrate how it might be done in DBSP.

We need to know the population in each country. Let’s add a new input source for population by country. Since that’s naturally a set of key-value pairs, we’ll make it an indexed Z-set input source instead of a plain Z-set. The first step is to make our circuit builder construct the source and return its handle. At the same time, we can prepare for our output to be an indexed Z-set with (location, year, month) key and (vaxxes, population) value:

fn build_circuit(
    circuit: &mut RootCircuit,
) -> Result<(
    ZSetHandle<Record>,
    IndexedZSetHandle<String, u64>,
    OutputHandle<OrdIndexedZSet<Tup3<String, i32, u8>, Tup2<i64, u64>>>,
)> {
    let (vax_stream, vax_handle) = circuit.add_input_zset::<Record>();
    let (pop_stream, pop_handle) = circuit.add_input_indexed_zset::<String, u64>();
    // ...
    Ok((vax_handle, pop_handle, vax_rates.output()))
}

The code in main needs to receive the additional handle and feed data into it. Let’s feed in fixed data this time:

fn main() -> Result<()> {
    let (circuit, (vax_handle, pop_handle, output_handle)) = RootCircuit::build(build_circuit)?;

    // ...
    let mut pop_records = vec![
        Tup2("England".into(), Tup2(56286961u64, 1i64)),
        Tup2("Northern Ireland".into(), Tup2(1893667, 1)),
        Tup2("Scotland".into(), Tup2(5463300, 1)),
        Tup2("Wales".into(), Tup2(3152879, 1)),
    ];
    pop_handle.append(&mut pop_records);
    // ...

The calculation of monthly totals stays the same. Starting from those, we calculate running vaccination totals, which requires first re-indexing. Then we join against the population table, which also requires a re-index step:

    let running_monthly_totals = monthly_totals
        .map_index(|(Tup3(l, y, m), v)| (*y as u32 * 12 + (*m as u32 - 1), Tup2(l.clone(), *v)))
        .partitioned_rolling_aggregate_linear(
            |Tup2(l, v)| (l.clone(), *v),
            |vaxxed| *vaxxed,
            |total| total,
            RelRange::new(RelOffset::Before(u32::MAX), RelOffset::Before(0)),
        );
    let vax_rates = running_monthly_totals
        .map_index(|(l, Tup2(date, total))| {
            (
                l.clone(),
                Tup3((date / 12) as i32, (date % 12 + 1) as u8, total.unwrap()),
            )
        })
        .join_index(&pop_stream, |l, Tup3(y, m, total), pop| {
            Some((Tup3(l.clone(), *y, *m), Tup2(*total, *pop)))
        });

And finally we adjust main to print the results:

    output_handle
        .consolidate()
        .iter()
        .for_each(|(Tup3(l, y, m), Tup2(vaxxes, pop), w)| {
            let rate = vaxxes as f64 / pop as f64 * 100.0;
            println!("{l:16} {y}-{m:02}: {vaxxes:9} {pop:8} {rate:6.2}% {w:+}",)
        });

The complete program is in tutorial8.rs. If we run it, it prints, in part, the following. The percentages over 100% are correct because this data counts vaccination doses rather than people:

England          2021-01:   5600174 56286961   9.95% +1
England          2021-02:  14977592 56286961  26.61% +1
England          2021-03:  26838767 56286961  47.68% +1
England          2021-04:  38127712 56286961  67.74% +1
England          2021-05:  51900658 56286961  92.21% +1
England          2021-06:  62845573 56286961 111.65% +1
...

§Incremental computation

DBSP shines when data arrive item by item or in batches, because its internals work “incrementally”, that is, they do only as much (re)computation as needed to reflect the input changes, rather than recalculating everything from an empty state.

Our examples so far have fed all of the input data into the circuit in one go. To demonstrate incremental computation, we can simulate data arriving over time by dividing our CSV file into batches and separately pushing each batch into an individual step of the circuit. tutorial9.rs does that: it is a copy of the program from Finding months with the most vaccinations modified so that it feeds data in at most 500 records per step. The only changes from the previous version are in main, which becomes:

fn main() -> Result<()> {
    let (circuit, (input_handle, output_handle)) = RootCircuit::build(build_circuit)?;

    let path = format!(
        "{}/examples/tutorial/vaccinations.csv",
        env!("CARGO_MANIFEST_DIR")
    );
    let mut reader = Reader::from_path(path)?;
    let mut input_records = reader.deserialize();
    loop {
        let mut batch = Vec::new();
        while batch.len() < 500 {
            let Some(record) = input_records.next() else {
                break;
            };
            batch.push(Tup2(record?, 1));
        }
        if batch.is_empty() {
            break;
        }
        println!("Input {} records:", batch.len());
        input_handle.append(&mut batch);

        circuit.transaction()?;

        output_handle
            .consolidate()
            .iter()
            .for_each(|(l, VaxMonthly { count, year, month }, w)| {
                println!("   {l:16} {year}-{month:02} {count:10}: {w:+}")
            });
        println!();
    }
    Ok(())
}

The output of our new program starts out like the following. The first batch of 500 records are data for England, so the program outputs the top 3. The second batch introduces some of the data for Northern Ireland, so the program outputs the initial top 3. The third batch includes a month with more vaccinations than the previous record, so it “retracts” (deletes) the previous 3rd-place record, indicated by the output record with -1 weight, and inserts the new record:

Input 500 records:
   England          2021-03   11861175: +1
   England          2021-05   13772946: +1
   England          2021-12   14801300: +1

Input 500 records:
   Northern Ireland 2021-03     328990: +1
   Northern Ireland 2021-05     394047: +1
   Northern Ireland 2021-04     436870: +1

Input 500 records:
   Northern Ireland 2021-03     328990: -1
   Northern Ireland 2021-12     489059: +1

§Fixed-point computation

Fixed-point computations are useful if you want to repeat a query until its result does not change anymore. Then, a fixed-point is reached and the query processing terminates, yielding the result. SQL provides this mechanism through recursive common table expressions (CTEs).

A classical use case for a fixed-point computation is the transitive closure of a graph. We demonstrate how to compute it for a weighted, directed (acyclic) graph with DBSP. We assume that the graph’s edges are stored in (from, to, weight) triples, and are interested to find out (1) which other nodes are reachable from any node of the graph, (2) at what cost (by cumulating the paths’ weights), and (3) how many hops are required for that path (through counting the paths’ edges). Hence, the query’s output schema is (start_node, end_node, cumulated_weight, hop_count) quadruples.

Next to joins, the code also shows how to use nested circuits. We have two execution contexts: a root circuit and a child circuit. The root circuit is the one that is built by the parameter to the RootCircuit::build function. The child circuit is defined by the parameter to the ChildCircuit<()>::recursive function. We also make use of the delta0 operator to import streams from a parent circuit into a child circuit. Finally, we pick up the incremental computation aspect from the previous section by feeding in data in two steps. The first step inserts this toy graph:

|0| -1-> |1| -1-> |2| -2-> |3| -2-> |4|

In the second step, we remove the edge from |1| -1-> |2| and are left with this graph containing two connected components:

|0| -1-> |1|      |2| -2-> |3| -2-> |4|

DBSP then tells us how the transitive closure computed in the first step changes in response to this input change in the second step.

use anyhow::Result;
use dbsp::{
    operator::Generator,
    utils::{Tup3, Tup4},
    zset, zset_set, Circuit, OrdZSet, RootCircuit, Stream, IndexedZSetReader
};

fn main() -> Result<()> {
    const STEPS: usize = 2;

    let (circuit_handle, output_handle) = RootCircuit::build(move |root_circuit| {
        let mut edges_data = ([
            zset_set! { Tup3(0_usize, 1_usize, 1_usize), Tup3(1, 2, 1), Tup3(2, 3, 2), Tup3(3, 4, 2) },
            zset! { Tup3(1, 2, 1) => -1 },
        ] as [_; STEPS])
        .into_iter();

        let edges = root_circuit.add_source(Generator::new(move || edges_data.next().unwrap()));

        // Create a base stream with all paths of length 1.
        let len_1 = edges.map(|Tup3(from, to, weight)| Tup4(*from, *to, *weight, 1));

        let closure = root_circuit.recursive(
            |child_circuit, len_n_minus_1: Stream<_, OrdZSet<Tup4<usize, usize, usize, usize>>>| {
                // Import the `edges` and `len_1` stream from the parent circuit
                // through the `delta0` operator.
                let edges = edges.delta0(child_circuit);
                let len_1 = len_1.delta0(child_circuit);

                // Perform an iterative step (n-1 to n) through joining the
                // paths of length n-1 with the edges.
                let len_n = len_n_minus_1
                    .map_index(|Tup4(start, end, cum_weight, hopcnt)| {
                        (*end, Tup4(*start, *end, *cum_weight, *hopcnt))
                    })
                    .join(
                        &edges
                            .map_index(|Tup3(from, to, weight)| (*from, Tup3(*from, *to, *weight))),
                        |_end_from,
                         Tup4(start, _end, cum_weight, hopcnt),
                         Tup3(_from, to, weight)| {
                            Tup4(*start, *to, cum_weight + weight, hopcnt + 1)
                        },
                    )
                    // You can think of the `plus` operator to something
                    // similar to the `union` operator in SQL.
                    .plus(&len_1);

                Ok(len_n)
            },
        )?;

        let mut expected_outputs = ([
            // We expect the full transitive closure in the first step.
            zset! {
                Tup4(0, 1, 1, 1) => 1,
                Tup4(0, 2, 2, 2) => 1,
                Tup4(0, 3, 4, 3) => 1,
                Tup4(0, 4, 6, 4) => 1,
                Tup4(1, 2, 1, 1) => 1,
                Tup4(1, 3, 3, 2) => 1,
                Tup4(1, 4, 5, 3) => 1,
                Tup4(2, 3, 2, 1) => 1,
                Tup4(2, 4, 4, 2) => 1,
                Tup4(3, 4, 2, 1) => 1,
            },
            // These paths are removed in the second step.
            zset! {
                Tup4(0, 2, 2, 2) => -1,
                Tup4(0, 3, 4, 3) => -1,
                Tup4(0, 4, 6, 4) => -1,
                Tup4(1, 2, 1, 1) => -1,
                Tup4(1, 3, 3, 2) => -1,
                Tup4(1, 4, 5, 3) => -1,
            },
        ] as [_; STEPS])
            .into_iter();

        closure.inspect(move |output| {
            assert_eq!(*output, expected_outputs.next().unwrap());
        });

        Ok(closure.output())
    })?;

    for i in 0..STEPS {
        let iteration = i + 1;
        println!("Iteration {} starts...", iteration);
        circuit_handle.transaction()?;
        output_handle.consolidate().iter().for_each(
            |(Tup4(start, end, cum_weight, hopcnt), _, z_weight)| {
                println!(
                    "{start} -> {end} (cum weight: {cum_weight}, hops: {hopcnt}) => {z_weight}"
                );
            },
        );
        println!("Iteration {} finished.", iteration);
    }

    Ok(())
}

We point out that introducing a cycle to the graph prevents this fixed-point computation from terminating because then there is no fixed-point anymore. To demonstrate this, we introduce a third step which feeds back in the previously removed edge |1| -1-> |2| and, additionally, introduces the edge |4| -3-> |0|, forming a cyclic graph. In total, we obtain the following graph:

|0| -1-> |1| -1-> |2| -2-> |3| -2-> |4|
 ^                                   |
 |                                   |
 ------------------3------------------

The code remains unchanged except for the changes in input data. Yet, we do not find a fixed-point anymore because we can endlessly walk cycles, due to ever growing cumulated weights and hop counts for already discovered pairs of reachable nodes. If you want to see this in action, we invite you to play around with tutorial10.rs.

To fix this issue, we have to change the code to stop iterating once the shortest path for each pair of nodes has been discovered. One approach to achieve this is to group by each pair and use the Min aggregation operator to only retain the shortest path for each pair. Aggregation requires to index the stream, so there are more code changes required than shown here. You can find the full code in tutorial11.rs but the important changes take place within the child circuit:

type Accumulator =
    Stream<NestedCircuit, OrdIndexedZSet<Tup2<usize, usize>, Tup4<usize, usize, usize, usize>>>;

let closure = root_circuit.recursive(|child_circuit, len_n_minus_1: Accumulator| {
    // Import the `edges` and `len_1` stream from the parent circuit.
    let edges = edges.delta0(child_circuit);
    let len_1 = len_1.delta0(child_circuit);

    // Perform an iterative step (n-1 to n) through joining the
    // paths of length n-1 with the edges.
    let len_n = len_n_minus_1
        .map_index(
            |(Tup2(_start, _end), Tup4(start, end, cum_weight, hopcnt))| {
                (*end, Tup4(*start, *end, *cum_weight, *hopcnt))
            },
        )
        // We now use `join_index` instead of `join` to index the stream on node pairs.
        .join_index(
            &edges.map_index(|Tup3(from, to, weight)| (*from, Tup3(*from, *to, *weight))),
            |_end_from, Tup4(start, _end, cum_weight, hopcnt), Tup3(_from, to, weight)| {
                Some((
                    Tup2(*start, *to),
                    Tup4(*start, *to, cum_weight + weight, hopcnt + 1),
                ))
            },
        )
        .plus(&len_1)
        // Here, we use the `aggregate` operator to only keep the shortest path.
        .aggregate(Min);

    Ok(len_n)
})?;

Keep in mind that this is just one way to fix the issue and that in general recursive queries with aggregates are not guaranteed to converge to the optimum of the aggregation function (here, the minimum function), even though there exists a finite solution.

§Next steps

We’ve shown how input, computation, and output work in DBSP. That’s all the basics. A good next step could be to look through the methods available on Stream for computation.

As a final note, we used RootCircuit::build to create our circuits. That method creates circuits that run in the current thread. DBSP also provides a multithreaded runtime. To run our circuit in 4 worker threads instead of in the current thread is as simple as importing dbsp::Runtime and then changing

let (circuit, (/*handles*/)) = RootCircuit::build(build_circuit)?;

to:

let (mut circuit, (/*handles*/)) = Runtime::init_circuit(4, build_circuit)?;

  1. The term “circuit” is used because diagrams of DBSP computation resemble those for electrical circuits. DBSP circuits are not necessarily closed loops like electrical circuits.