dbsp 0.290.0

Continuous streaming analytics engine
Documentation
use anyhow::Result as AnyResult;
use chrono::Datelike;
use clap::Parser;
use dbsp::circuit::CircuitConfig;
use dbsp::typed_batch::IndexedZSetReader;
use dbsp::utils::Tup2;
use dbsp::{
    DBSPHandle, OrdIndexedZSet, OutputHandle, RootCircuit, Runtime, ZSetHandle, circuit::Layout,
    utils::Tup3,
};
use futures::{future, prelude::*};
use std::{
    net::SocketAddr,
    sync::{Arc, Mutex, MutexGuard},
};
use tarpc::{
    context,
    serde_transport::tcp::listen,
    server::{self, Channel, incoming::Incoming},
    tokio_serde::formats::Bincode,
};

mod service;
use service::*;

#[derive(Debug, Clone, Parser)]
struct Args {
    /// IP address and TCP port to listen, in the form `<ip>:<port>`.  If
    /// `<port>` is `0`, the kernel will pick a free port.
    #[clap(long, default_value = "127.0.0.1:0")]
    address: SocketAddr,
}

#[allow(clippy::type_complexity)]
fn build_circuit(
    circuit: &mut RootCircuit,
) -> AnyResult<(
    ZSetHandle<Record>,
    OutputHandle<OrdIndexedZSet<String, VaxMonthly>>,
)> {
    let (input_stream, input_handle) = circuit.add_input_zset::<Record>();
    let subset = input_stream.filter(|r| {
        r.location == "England"
            || r.location == "Northern Ireland"
            || r.location == "Scotland"
            || r.location == "Wales"
    });
    let monthly_totals = subset
        .map_index(|r| {
            let date = chrono::NaiveDate::from_epoch_days(r.date).unwrap();
            (
                Tup3(r.location.clone(), date.year(), date.month() as u8),
                r.daily_vaccinations.unwrap_or(0),
            )
        })
        .aggregate_linear(|v| *v as i64);
    let most_vax = monthly_totals
        .map_index(|(Tup3(l, y, m), sum)| {
            (
                l.clone(),
                VaxMonthly {
                    count: *sum as u64,
                    year: *y,
                    month: *m,
                },
            )
        })
        .topk_desc(3);
    Ok((input_handle, most_vax.output()))
}

struct Inner {
    circuit: DBSPHandle,
    input_handle: ZSetHandle<Record>,
    output_handle: OutputHandle<OrdIndexedZSet<String, VaxMonthly>>,
}

impl Inner {
    fn new(layout: impl Into<CircuitConfig>) -> AnyResult<Inner> {
        let (circuit, (input_handle, output_handle)) =
            Runtime::init_circuit(layout, build_circuit)?;
        Ok(Inner {
            circuit,
            input_handle,
            output_handle,
        })
    }
}

#[derive(Clone)]
struct Server(Arc<Mutex<Option<Inner>>>);

impl Server {
    fn new() -> Server {
        Server(Arc::new(Mutex::new(None)))
    }
    fn inner(&self) -> MutexGuard<'_, Option<Inner>> {
        self.0.lock().unwrap()
    }
    fn replace(&self, layout: impl Into<CircuitConfig>) {
        let mut inner = self.inner();

        // First clear the old server, if any, and clean up.  It's important to
        // do this first in case the old server is using resources that the new
        // server will also need (e.g. listening on ports).
        drop(inner.take());

        *inner = Some(Inner::new(layout).unwrap());
    }
}
impl Circuit for Server {
    async fn init(self, _: context::Context, layout: Layout) {
        self.replace(layout)
    }
    async fn run(
        self,
        _: context::Context,
        mut records: Vec<Tup2<Record, i64>>,
    ) -> Vec<(String, VaxMonthly, i64)> {
        self.inner()
            .as_ref()
            .unwrap()
            .input_handle
            .append(&mut records);
        self.inner()
            .as_mut()
            .unwrap()
            .circuit
            .transaction()
            .unwrap();
        self.inner()
            .as_ref()
            .unwrap()
            .output_handle
            .consolidate()
            .iter()
            .collect()
    }
}

async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
    tokio::spawn(fut);
}

#[tokio::main]
async fn main() -> AnyResult<()> {
    let Args { address } = Args::parse();

    let mut listener = listen(address, Bincode::default).await?;
    println!("Listening on port {}", listener.local_addr().port());
    listener.config_mut().max_frame_length(usize::MAX);
    let server = Server::new();
    listener
        .filter_map(|r| future::ready(r.ok()))
        .map(server::BaseChannel::with_defaults)
        .max_channels_per_key(1, |t| t.transport().peer_addr().unwrap().ip())
        .map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
        .buffer_unordered(10)
        .for_each(|_| async {})
        .await;

    Ok(())
}