use anyhow::Result as AnyResult;
use chrono::Datelike;
use clap::Parser;
use dbsp::circuit::CircuitConfig;
use dbsp::typed_batch::IndexedZSetReader;
use dbsp::utils::Tup2;
use dbsp::{
DBSPHandle, OrdIndexedZSet, OutputHandle, RootCircuit, Runtime, ZSetHandle, circuit::Layout,
utils::Tup3,
};
use futures::{future, prelude::*};
use std::{
net::SocketAddr,
sync::{Arc, Mutex, MutexGuard},
};
use tarpc::{
context,
serde_transport::tcp::listen,
server::{self, Channel, incoming::Incoming},
tokio_serde::formats::Bincode,
};
mod service;
use service::*;
#[derive(Debug, Clone, Parser)]
struct Args {
#[clap(long, default_value = "127.0.0.1:0")]
address: SocketAddr,
}
#[allow(clippy::type_complexity)]
fn build_circuit(
circuit: &mut RootCircuit,
) -> AnyResult<(
ZSetHandle<Record>,
OutputHandle<OrdIndexedZSet<String, VaxMonthly>>,
)> {
let (input_stream, input_handle) = circuit.add_input_zset::<Record>();
let subset = input_stream.filter(|r| {
r.location == "England"
|| r.location == "Northern Ireland"
|| r.location == "Scotland"
|| r.location == "Wales"
});
let monthly_totals = subset
.map_index(|r| {
let date = chrono::NaiveDate::from_epoch_days(r.date).unwrap();
(
Tup3(r.location.clone(), date.year(), date.month() as u8),
r.daily_vaccinations.unwrap_or(0),
)
})
.aggregate_linear(|v| *v as i64);
let most_vax = monthly_totals
.map_index(|(Tup3(l, y, m), sum)| {
(
l.clone(),
VaxMonthly {
count: *sum as u64,
year: *y,
month: *m,
},
)
})
.topk_desc(3);
Ok((input_handle, most_vax.output()))
}
struct Inner {
circuit: DBSPHandle,
input_handle: ZSetHandle<Record>,
output_handle: OutputHandle<OrdIndexedZSet<String, VaxMonthly>>,
}
impl Inner {
fn new(layout: impl Into<CircuitConfig>) -> AnyResult<Inner> {
let (circuit, (input_handle, output_handle)) =
Runtime::init_circuit(layout, build_circuit)?;
Ok(Inner {
circuit,
input_handle,
output_handle,
})
}
}
#[derive(Clone)]
struct Server(Arc<Mutex<Option<Inner>>>);
impl Server {
fn new() -> Server {
Server(Arc::new(Mutex::new(None)))
}
fn inner(&self) -> MutexGuard<'_, Option<Inner>> {
self.0.lock().unwrap()
}
fn replace(&self, layout: impl Into<CircuitConfig>) {
let mut inner = self.inner();
drop(inner.take());
*inner = Some(Inner::new(layout).unwrap());
}
}
impl Circuit for Server {
async fn init(self, _: context::Context, layout: Layout) {
self.replace(layout)
}
async fn run(
self,
_: context::Context,
mut records: Vec<Tup2<Record, i64>>,
) -> Vec<(String, VaxMonthly, i64)> {
self.inner()
.as_ref()
.unwrap()
.input_handle
.append(&mut records);
self.inner()
.as_mut()
.unwrap()
.circuit
.transaction()
.unwrap();
self.inner()
.as_ref()
.unwrap()
.output_handle
.consolidate()
.iter()
.collect()
}
}
async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
tokio::spawn(fut);
}
#[tokio::main]
async fn main() -> AnyResult<()> {
let Args { address } = Args::parse();
let mut listener = listen(address, Bincode::default).await?;
println!("Listening on port {}", listener.local_addr().port());
listener.config_mut().max_frame_length(usize::MAX);
let server = Server::new();
listener
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
.max_channels_per_key(1, |t| t.transport().peer_addr().unwrap().ip())
.map(|channel| channel.execute(server.clone().serve()).for_each(spawn))
.buffer_unordered(10)
.for_each(|_| async {})
.await;
Ok(())
}