dbsp 0.287.0

Continuous streaming analytics engine
Documentation
use anyhow::Result;
use chrono::Datelike;
use csv::Reader;
use dbsp::typed_batch::IndexedZSetReader;
use dbsp::{
    OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight,
    utils::{Tup2, Tup3},
};
use feldera_macros::IsNone;
use rkyv::{Archive, Serialize};
use size_of::SizeOf;

#[derive(
    Clone,
    Default,
    Debug,
    Eq,
    PartialEq,
    Ord,
    PartialOrd,
    Hash,
    SizeOf,
    Archive,
    Serialize,
    rkyv::Deserialize,
    serde::Deserialize,
    IsNone,
)]
#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
struct Record {
    location: String,
    date: i32,
    daily_vaccinations: Option<u64>,
}

#[derive(
    Clone,
    Default,
    Debug,
    Eq,
    PartialEq,
    Ord,
    PartialOrd,
    Hash,
    SizeOf,
    Archive,
    Serialize,
    rkyv::Deserialize,
    serde::Deserialize,
    IsNone,
)]
#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
#[archive(compare(PartialEq, PartialOrd))]
struct VaxMonthly {
    count: u64,
    year: i32,
    month: u8,
}

#[allow(clippy::type_complexity)]
fn build_circuit(
    circuit: &mut RootCircuit,
) -> Result<(
    ZSetHandle<Record>,
    OutputHandle<OrdIndexedZSet<String, VaxMonthly>>,
)> {
    let (input_stream, input_handle) = circuit.add_input_zset::<Record>();
    let subset = input_stream.filter(|r| {
        r.location == "England"
            || r.location == "Northern Ireland"
            || r.location == "Scotland"
            || r.location == "Wales"
    });
    let monthly_totals = subset
        .map_index(|r| {
            let date = chrono::NaiveDate::from_epoch_days(r.date).unwrap();
            (
                Tup3(r.location.clone(), date.year(), date.month() as u8),
                r.daily_vaccinations.unwrap_or(0),
            )
        })
        .aggregate_linear(|v| *v as ZWeight);
    let most_vax = monthly_totals
        .map_index(|(Tup3(l, y, m), sum)| {
            (
                l.clone(),
                VaxMonthly {
                    count: *sum as u64,
                    year: *y,
                    month: *m,
                },
            )
        })
        .topk_desc(3);
    Ok((input_handle, most_vax.output()))
}

fn main() -> Result<()> {
    let (circuit, (input_handle, output_handle)) = RootCircuit::build(build_circuit)?;

    let path = format!(
        "{}/examples/tutorial/vaccinations.csv",
        env!("CARGO_MANIFEST_DIR")
    );
    let mut reader = Reader::from_path(path)?;
    let mut input_records = reader.deserialize();
    loop {
        let mut batch = Vec::new();
        while batch.len() < 500 {
            let Some(record) = input_records.next() else {
                break;
            };
            batch.push(Tup2(record?, 1));
        }
        if batch.is_empty() {
            break;
        }
        println!("Input {} records:", batch.len());
        input_handle.append(&mut batch);

        circuit.transaction()?;

        output_handle
            .consolidate()
            .iter()
            .for_each(|(l, VaxMonthly { count, year, month }, w)| {
                println!("   {l:16} {year}-{month:02} {count:10}: {w:+}")
            });
        println!();
    }
    Ok(())
}