dbsp 0.287.0

Continuous streaming analytics engine
Documentation
use anyhow::Result;
use csv::Reader;
use dbsp::utils::Tup2;
use dbsp::{OrdZSet, OutputHandle, RootCircuit, ZSet, ZSetHandle, ZWeight};
use feldera_macros::IsNone;
use rkyv::{Archive, Serialize};
use size_of::SizeOf;

#[derive(
    Clone,
    Default,
    Debug,
    Eq,
    PartialEq,
    Ord,
    PartialOrd,
    Hash,
    SizeOf,
    Archive,
    Serialize,
    rkyv::Deserialize,
    serde::Deserialize,
    IsNone,
)]
#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
struct Record {
    location: String,
    date: i32,
    daily_vaccinations: Option<u64>,
}
fn build_circuit(
    circuit: &mut RootCircuit,
) -> Result<(ZSetHandle<Record>, OutputHandle<OrdZSet<Record>>)> {
    let (input_stream, input_handle) = circuit.add_input_zset::<Record>();
    input_stream.inspect(|records| {
        println!("{}", records.weighted_count());
    });
    let subset = input_stream.filter(|r| {
        r.location == "England"
            || r.location == "Northern Ireland"
            || r.location == "Scotland"
            || r.location == "Wales"
    });
    Ok((input_handle, subset.output()))
}

fn main() -> Result<()> {
    // Build circuit.
    let (circuit, (input_handle, output_handle)) = RootCircuit::build(build_circuit)?;

    // Feed data into circuit.
    let path = format!(
        "{}/examples/tutorial/vaccinations.csv",
        env!("CARGO_MANIFEST_DIR")
    );
    let mut input_records = Reader::from_path(path)?
        .deserialize()
        .map(|result| result.map(|record| Tup2(record, 1)))
        .collect::<Result<Vec<Tup2<Record, ZWeight>>, _>>()?;
    input_handle.append(&mut input_records);

    // Execute circuit.
    circuit.transaction()?;

    // Read output from circuit.
    println!("{}", output_handle.consolidate().weighted_count());

    Ok(())
}