use super::table::RocksDBGroupTable;
use async_trait::async_trait;
use pipebase::{
common::{AggregateAs, ConfigInto, FromConfig, FromPath, GroupAggregate, GroupAs, Init, Pair},
map::Map,
};
use pipebytes::{AsBytes, FromBytes, IntoBytes};
use serde::Deserialize;
use std::collections::HashMap;
use std::{hash::Hash, iter::FromIterator};
#[derive(Deserialize)]
pub struct RocksDBUnorderedGroupAddAggregatorConfig {
path: String,
}
impl FromPath for RocksDBUnorderedGroupAddAggregatorConfig {}
impl ConfigInto<RocksDBUnorderedGroupAddAggregator> for RocksDBUnorderedGroupAddAggregatorConfig {}
pub struct RocksDBUnorderedGroupAddAggregator {
path: String,
}
#[async_trait]
impl FromConfig<RocksDBUnorderedGroupAddAggregatorConfig> for RocksDBUnorderedGroupAddAggregator {
async fn from_config(config: RocksDBUnorderedGroupAddAggregatorConfig) -> anyhow::Result<Self> {
Ok(RocksDBUnorderedGroupAddAggregator { path: config.path })
}
}
impl<I, T, K, V, U> GroupAggregate<I, T, K, V, U, RocksDBGroupTable<HashMap<K, V>>>
for RocksDBUnorderedGroupAddAggregator
where
I: GroupAs<K> + AggregateAs<V>,
K: Hash + Eq + PartialEq + Clone + AsBytes + IntoBytes,
V: std::ops::AddAssign<V> + Init + AsBytes + IntoBytes + FromBytes + Clone,
T: IntoIterator<Item = I>,
U: FromIterator<Pair<K, V>>,
{
fn merge(&self, v: &mut V, i: &I) {
*v += i.aggregate_value();
}
fn group_table(&self) -> anyhow::Result<RocksDBGroupTable<HashMap<K, V>>> {
RocksDBGroupTable::new(&self.path, HashMap::new())
}
}
#[async_trait]
impl<I, K, V, T> Map<T, Vec<Pair<K, V>>, RocksDBUnorderedGroupAddAggregatorConfig>
for RocksDBUnorderedGroupAddAggregator
where
I: GroupAs<K> + AggregateAs<V>,
K: Hash + Eq + PartialEq + Clone + AsBytes + IntoBytes,
V: std::ops::AddAssign<V> + Init + FromBytes + AsBytes + IntoBytes + Clone,
T: IntoIterator<Item = I> + Send + 'static,
{
async fn map(&mut self, data: T) -> anyhow::Result<Vec<Pair<K, V>>> {
let sums = self.group_aggregate(data)?;
Ok(sums)
}
}
#[cfg(test)]
mod rockdb_group_sum_tests {
use crate::*;
use pipebase::prelude::*;
use tokio::sync::mpsc::Sender;
#[derive(AggregateAs, GroupAs)]
struct Record {
#[group]
id: String,
#[agg(sum)]
value: u32,
}
impl Record {
pub fn new(id: &str, value: u32) -> Self {
Record {
id: id.to_owned(),
value: value,
}
}
}
async fn populate_records<T, U>(tx: Sender<T>, records: U)
where
U: IntoIterator<Item = T>,
{
for record in records {
let _ = tx.send(record).await;
}
}
#[tokio::test]
async fn test_record_group_sum() {
let (tx0, rx0) = channel!(Vec<Record>, 1024);
let (tx1, mut rx1) = channel!(Vec<Pair<String, u32>>, 1024);
let channels = pipe_channels!(rx0, [tx1]);
let config = config!(
RocksDBUnorderedGroupAddAggregatorConfig,
"resources/catalogs/rocksdb_sum.yml"
);
let pipe = mapper!("record_sum");
let f0 = populate_records(
tx0,
vec![
vec![
Record::new("foo", 1),
Record::new("foo", 2),
Record::new("bar", 3),
],
vec![
Record::new("foo", 1),
Record::new("foo", 2),
Record::new("bar", 3),
],
],
);
f0.await;
let pipe_run = run_pipe!(pipe, config, channels);
let _ = pipe_run.await;
let gs = rx1.recv().await.unwrap();
assert_eq!(2, gs.len());
for sum in gs {
match sum.left().as_str() {
"foo" => assert_eq!(&3, sum.right()),
"bar" => assert_eq!(&3, sum.right()),
_ => unreachable!(),
}
}
let gs = rx1.recv().await.unwrap();
assert_eq!(2, gs.len());
for sum in gs {
match sum.left().as_str() {
"foo" => assert_eq!(&6, sum.right()),
"bar" => assert_eq!(&6, sum.right()),
_ => unreachable!(),
}
}
std::fs::remove_dir_all("resources/data/rocks/sum").unwrap()
}
}
#[cfg(test)]
mod unordered_group_avg_f32_tests {
use crate::*;
use pipebase::prelude::*;
use tokio::sync::mpsc::Sender;
#[derive(Clone, Debug, AggregateAs, GroupAs)]
struct Record {
#[group]
id: String,
#[agg(avgf32)]
value: i32,
}
async fn populate_records<T, U>(tx: Sender<T>, records: U)
where
U: IntoIterator<Item = T>,
{
for record in records {
let _ = tx.send(record).await;
}
}
#[tokio::test]
async fn test_unordered_group_avg_f32() {
let (tx0, rx0) = channel!(Vec<Record>, 1024);
let (tx1, mut rx1) = channel!(Vec<Pair<String, Averagef32>>, 1024);
let channels = pipe_channels!(rx0, [tx1]);
let config = config!(
RocksDBUnorderedGroupAddAggregatorConfig,
"resources/catalogs/rocksdb_avg.yml"
);
let pipe = mapper!("group_avg_f32");
let pipe = run_pipe!(pipe, config, channels);
let f0 = populate_records(
tx0,
vec![
vec![
Record {
id: "foo".to_owned(),
value: 1,
},
Record {
id: "foo".to_owned(),
value: 2,
},
Record {
id: "bar".to_owned(),
value: 2,
},
Record {
id: "bar".to_owned(),
value: 3,
},
],
vec![
Record {
id: "foo".to_owned(),
value: 3,
},
Record {
id: "bar".to_owned(),
value: 4,
},
],
],
);
f0.await;
join_pipes!([pipe]);
let group_avgs = rx1.recv().await.expect("group average not found");
for avg in group_avgs {
match &avg.left()[..] {
"foo" => {
assert_eq!(1.5, avg.right().average())
}
"bar" => {
assert_eq!(2.5, avg.right().average())
}
_ => unreachable!("unexpected group {}", avg.left()),
}
}
let group_avgs = rx1.recv().await.expect("group average not found");
for avg in group_avgs {
match &avg.left()[..] {
"foo" => {
assert_eq!(2.0, avg.right().average())
}
"bar" => {
assert_eq!(3.0, avg.right().average())
}
_ => unreachable!("unexpected group {}", avg.left()),
}
}
std::fs::remove_dir_all("resources/data/rocks/avg").unwrap()
}
}
#[cfg(test)]
mod group_count32_tests {
use crate::*;
use pipebase::prelude::*;
use tokio::sync::mpsc::Sender;
#[derive(Debug, Clone, GroupAs, AggregateAs)]
#[agg(count32)]
struct Record {
#[group]
key: String,
}
async fn populate_records<T, U>(tx: Sender<T>, records: U)
where
U: IntoIterator<Item = T>,
{
for record in records {
let _ = tx.send(record).await;
}
}
#[tokio::test]
async fn test_group_count32() {
let (tx0, rx0) = channel!(Vec<Record>, 1024);
let (tx1, mut rx1) = channel!(Vec<Pair<String, Count32>>, 1024);
let channels = pipe_channels!(rx0, [tx1]);
let config = config!(
RocksDBUnorderedGroupAddAggregatorConfig,
"resources/catalogs/rocksdb_count.yml"
);
let pipe = mapper!("group_count32");
let pipe = run_pipe!(pipe, config, channels);
let f0 = populate_records(
tx0,
vec![
vec![
Record {
key: "foo".to_owned(),
},
Record {
key: "foo".to_owned(),
},
Record {
key: "bar".to_owned(),
},
],
vec![
Record {
key: "bar".to_owned(),
},
Record {
key: "bar".to_owned(),
},
Record {
key: "foo".to_owned(),
},
],
],
);
f0.await;
join_pipes!([pipe]);
let group_counts = rx1.recv().await.expect("group count32 not found");
for count in group_counts {
match &count.left()[..] {
"foo" => {
assert_eq!(2, count.right().get())
}
"bar" => {
assert_eq!(1, count.right().get())
}
_ => unreachable!("unexpected group {}", count.left()),
}
}
let group_counts = rx1.recv().await.expect("group count32 not found");
for count in group_counts {
match &count.left()[..] {
"foo" => {
assert_eq!(3, count.right().get())
}
"bar" => {
assert_eq!(3, count.right().get())
}
_ => unreachable!("unexpected group {}", count.left()),
}
}
std::fs::remove_dir_all("resources/data/rocks/count").unwrap()
}
}