use std::iter::{FromIterator, IntoIterator};
use crate::common::{ConfigInto, Filter, FromConfig, FromPath};
use super::Map;
use async_trait::async_trait;
use serde::Deserialize;
use std::path::Path;
#[derive(Deserialize)]
pub struct FilterMapConfig {}
#[async_trait]
impl FromPath for FilterMapConfig {
async fn from_path<P>(_path: P) -> anyhow::Result<Self>
where
P: AsRef<Path> + Send,
{
Ok(FilterMapConfig {})
}
}
#[async_trait]
impl ConfigInto<FilterMap> for FilterMapConfig {}
pub struct FilterMap {}
#[async_trait]
impl FromConfig<FilterMapConfig> for FilterMap {
async fn from_config(_config: FilterMapConfig) -> anyhow::Result<Self> {
Ok(FilterMap {})
}
}
#[async_trait]
impl<T, U, V> Map<U, V, FilterMapConfig> for FilterMap
where
T: Filter + Clone + Sync,
U: IntoIterator<Item = T> + Send + Clone + 'static,
V: FromIterator<T> + Send,
{
async fn map(&mut self, data: U) -> anyhow::Result<V> {
Ok(data
.into_iter()
.filter(|item| T::filter(item))
.collect::<V>())
}
}
#[cfg(test)]
mod tests {
use crate::prelude::*;
#[derive(Clone, Debug, Filter)]
#[filter(alias = "r", predicate = "r.r0 + r.r1 < 1")]
struct Record {
pub r0: i32,
pub r1: i32,
}
#[tokio::test]
async fn test_filter_map() {
let (tx0, rx0) = channel!(Vec<Record>, 1024);
let (tx1, mut rx1) = channel!(Vec<self::Record>, 1024);
let channels = pipe_channels!(rx0, [tx1]);
let config = config!(FilterMapConfig);
let pipe = mapper!("filter_map");
let f1 = populate_records(
tx0,
vec![vec![
Record { r0: 1, r1: 0 },
Record { r0: 0, r1: 1 },
Record { r0: 0, r1: 0 },
]],
);
f1.await;
join_pipes!([run_pipe!(pipe, config, channels)]);
let filtered_records = rx1.recv().await.unwrap();
assert_eq!(1, filtered_records.len());
assert_eq!(0, filtered_records[0].r0);
assert_eq!(0, filtered_records[0].r1);
}
}