1use super::Map;
2use crate::common::{ConfigInto, FromConfig, FromPath, Project};
3use async_trait::async_trait;
4use serde::Deserialize;
5use std::path::Path;
6
7#[derive(Deserialize)]
8pub struct ProjectionConfig {}
9
10#[async_trait]
11impl FromPath for ProjectionConfig {
12 async fn from_path<P>(_path: P) -> anyhow::Result<Self>
13 where
14 P: AsRef<Path> + Send,
15 {
16 Ok(ProjectionConfig {})
17 }
18}
19
20#[async_trait]
21impl ConfigInto<Projection> for ProjectionConfig {}
22
23pub struct Projection {}
25
26#[async_trait]
27impl FromConfig<ProjectionConfig> for Projection {
28 async fn from_config(_config: ProjectionConfig) -> anyhow::Result<Self> {
29 Ok(Projection {})
30 }
31}
32
33#[async_trait]
37impl<T, U> Map<T, U, ProjectionConfig> for Projection
38where
39 T: Send + 'static,
40 U: Project<T>,
41{
42 async fn map(&mut self, data: T) -> anyhow::Result<U> {
43 Ok(U::project(&data))
44 }
45}
46
47#[cfg(test)]
48mod tests {
49
50 use crate::prelude::*;
51
52 #[derive(Debug)]
53 struct Record {
54 pub r0: i32,
55 pub r1: i32,
56 }
57
58 #[derive(Clone, Debug, Project)]
59 #[project(input = "self::Record")]
60 struct SwappedRecord {
61 #[project(from = "r1")]
62 pub r0: i32,
63 #[project(from = "r0")]
64 pub r1: i32,
65 }
66
67 #[tokio::test]
68 async fn test_reverse_processor() {
69 let (tx0, rx0) = channel!(Record, 1024);
70 let (tx1, mut rx1) = channel!(self::SwappedRecord, 1024);
71 let channels = pipe_channels!(rx0, [tx1]);
72 let config = config!(ProjectionConfig);
73 let pipe = mapper!("swapped");
74 let context = pipe.get_context();
75 let f1 = populate_records(tx0, vec![Record { r0: 0, r1: 1 }]);
76 f1.await;
77 join_pipes!([run_pipe!(pipe, config, channels)]);
78 let swapped_record = rx1.recv().await.unwrap();
79 assert_eq!(1, swapped_record.r0);
80 assert_eq!(0, swapped_record.r1);
81 context.validate(State::Done, 1);
82 }
83}