piperedis/map/aggregate/
add.rs1use super::table::RedisGroupTable;
2use async_trait::async_trait;
3use pipebase::{
4 common::{AggregateAs, ConfigInto, FromConfig, FromPath, GroupAggregate, GroupAs, Init, Pair},
5 map::Map,
6};
7use redis::{FromRedisValue, ToRedisArgs};
8use serde::Deserialize;
9use std::collections::HashMap;
10use std::{hash::Hash, iter::FromIterator};
11
12#[derive(Deserialize)]
13pub struct RedisUnorderedGroupAddAggregatorConfig {
14 url: String,
15}
16
17impl FromPath for RedisUnorderedGroupAddAggregatorConfig {}
18
19impl ConfigInto<RedisUnorderedGroupAddAggregator> for RedisUnorderedGroupAddAggregatorConfig {}
20
21pub struct RedisUnorderedGroupAddAggregator {
22 url: String,
23}
24
25#[async_trait]
26impl FromConfig<RedisUnorderedGroupAddAggregatorConfig> for RedisUnorderedGroupAddAggregator {
27 async fn from_config(config: RedisUnorderedGroupAddAggregatorConfig) -> anyhow::Result<Self> {
28 Ok(RedisUnorderedGroupAddAggregator { url: config.url })
29 }
30}
31
32impl<I, T, K, V, U> GroupAggregate<I, T, K, V, U, RedisGroupTable<HashMap<K, V>>>
33 for RedisUnorderedGroupAddAggregator
34where
35 I: GroupAs<K> + AggregateAs<V>,
36 K: Hash + Eq + PartialEq + Clone + ToRedisArgs,
37 V: std::ops::AddAssign<V> + Init + ToRedisArgs + FromRedisValue + Clone,
38 T: IntoIterator<Item = I>,
39 U: FromIterator<Pair<K, V>>,
40{
41 fn merge(&self, v: &mut V, i: &I) {
42 *v += i.aggregate_value();
43 }
44
45 fn group_table(&self) -> anyhow::Result<RedisGroupTable<HashMap<K, V>>> {
46 RedisGroupTable::new(self.url.to_owned(), HashMap::new())
47 }
48}
49
50#[async_trait]
51impl<I, K, V, T> Map<T, Vec<Pair<K, V>>, RedisUnorderedGroupAddAggregatorConfig>
52 for RedisUnorderedGroupAddAggregator
53where
54 I: GroupAs<K> + AggregateAs<V>,
55 K: Hash + Eq + PartialEq + Clone + ToRedisArgs,
56 V: std::ops::AddAssign<V> + Init + ToRedisArgs + FromRedisValue + Clone,
57 T: IntoIterator<Item = I> + Send + 'static,
58{
59 async fn map(&mut self, data: T) -> anyhow::Result<Vec<Pair<K, V>>> {
60 let sums = self.group_aggregate(data)?;
61 Ok(sums)
62 }
63}