piperedis/map/aggregate/
add.rs

1use super::table::RedisGroupTable;
2use async_trait::async_trait;
3use pipebase::{
4    common::{AggregateAs, ConfigInto, FromConfig, FromPath, GroupAggregate, GroupAs, Init, Pair},
5    map::Map,
6};
7use redis::{FromRedisValue, ToRedisArgs};
8use serde::Deserialize;
9use std::collections::HashMap;
10use std::{hash::Hash, iter::FromIterator};
11
12#[derive(Deserialize)]
13pub struct RedisUnorderedGroupAddAggregatorConfig {
14    url: String,
15}
16
17impl FromPath for RedisUnorderedGroupAddAggregatorConfig {}
18
19impl ConfigInto<RedisUnorderedGroupAddAggregator> for RedisUnorderedGroupAddAggregatorConfig {}
20
21pub struct RedisUnorderedGroupAddAggregator {
22    url: String,
23}
24
25#[async_trait]
26impl FromConfig<RedisUnorderedGroupAddAggregatorConfig> for RedisUnorderedGroupAddAggregator {
27    async fn from_config(config: RedisUnorderedGroupAddAggregatorConfig) -> anyhow::Result<Self> {
28        Ok(RedisUnorderedGroupAddAggregator { url: config.url })
29    }
30}
31
32impl<I, T, K, V, U> GroupAggregate<I, T, K, V, U, RedisGroupTable<HashMap<K, V>>>
33    for RedisUnorderedGroupAddAggregator
34where
35    I: GroupAs<K> + AggregateAs<V>,
36    K: Hash + Eq + PartialEq + Clone + ToRedisArgs,
37    V: std::ops::AddAssign<V> + Init + ToRedisArgs + FromRedisValue + Clone,
38    T: IntoIterator<Item = I>,
39    U: FromIterator<Pair<K, V>>,
40{
41    fn merge(&self, v: &mut V, i: &I) {
42        *v += i.aggregate_value();
43    }
44
45    fn group_table(&self) -> anyhow::Result<RedisGroupTable<HashMap<K, V>>> {
46        RedisGroupTable::new(self.url.to_owned(), HashMap::new())
47    }
48}
49
50#[async_trait]
51impl<I, K, V, T> Map<T, Vec<Pair<K, V>>, RedisUnorderedGroupAddAggregatorConfig>
52    for RedisUnorderedGroupAddAggregator
53where
54    I: GroupAs<K> + AggregateAs<V>,
55    K: Hash + Eq + PartialEq + Clone + ToRedisArgs,
56    V: std::ops::AddAssign<V> + Init + ToRedisArgs + FromRedisValue + Clone,
57    T: IntoIterator<Item = I> + Send + 'static,
58{
59    async fn map(&mut self, data: T) -> anyhow::Result<Vec<Pair<K, V>>> {
60        let sums = self.group_aggregate(data)?;
61        Ok(sums)
62    }
63}