pipebase/map/
project.rs

1use super::Map;
2use crate::common::{ConfigInto, FromConfig, FromPath, Project};
3use async_trait::async_trait;
4use serde::Deserialize;
5use std::path::Path;
6
7#[derive(Deserialize)]
8pub struct ProjectionConfig {}
9
10#[async_trait]
11impl FromPath for ProjectionConfig {
12    async fn from_path<P>(_path: P) -> anyhow::Result<Self>
13    where
14        P: AsRef<Path> + Send,
15    {
16        Ok(ProjectionConfig {})
17    }
18}
19
20#[async_trait]
21impl ConfigInto<Projection> for ProjectionConfig {}
22
23/// Project from type to type
24pub struct Projection {}
25
26#[async_trait]
27impl FromConfig<ProjectionConfig> for Projection {
28    async fn from_config(_config: ProjectionConfig) -> anyhow::Result<Self> {
29        Ok(Projection {})
30    }
31}
32
33/// # Parameters
34/// * T: input
35/// * U: output
36#[async_trait]
37impl<T, U> Map<T, U, ProjectionConfig> for Projection
38where
39    T: Send + 'static,
40    U: Project<T>,
41{
42    async fn map(&mut self, data: T) -> anyhow::Result<U> {
43        Ok(U::project(&data))
44    }
45}
46
47#[cfg(test)]
48mod tests {
49
50    use crate::prelude::*;
51
52    #[derive(Debug)]
53    struct Record {
54        pub r0: i32,
55        pub r1: i32,
56    }
57
58    #[derive(Clone, Debug, Project)]
59    #[project(input = "self::Record")]
60    struct SwappedRecord {
61        #[project(from = "r1")]
62        pub r0: i32,
63        #[project(from = "r0")]
64        pub r1: i32,
65    }
66
67    #[tokio::test]
68    async fn test_reverse_processor() {
69        let (tx0, rx0) = channel!(Record, 1024);
70        let (tx1, mut rx1) = channel!(self::SwappedRecord, 1024);
71        let channels = pipe_channels!(rx0, [tx1]);
72        let config = config!(ProjectionConfig);
73        let pipe = mapper!("swapped");
74        let context = pipe.get_context();
75        let f1 = populate_records(tx0, vec![Record { r0: 0, r1: 1 }]);
76        f1.await;
77        join_pipes!([run_pipe!(pipe, config, channels)]);
78        let swapped_record = rx1.recv().await.unwrap();
79        assert_eq!(1, swapped_record.r0);
80        assert_eq!(0, swapped_record.r1);
81        context.validate(State::Done, 1);
82    }
83}