fluxus_api/operators/
map.rs

1use async_trait::async_trait;
2use fluxus_transformers::Operator;
3use fluxus_utils::models::{Record, StreamResult};
4use std::marker::PhantomData;
5
6pub struct MapOperator<T, R, F> {
7    f: F,
8    _phantom: PhantomData<(T, R)>,
9}
10
11impl<T, R, F> MapOperator<T, R, F>
12where
13    F: Fn(T) -> R,
14{
15    pub fn new(f: F) -> Self {
16        Self {
17            f,
18            _phantom: PhantomData,
19        }
20    }
21}
22
23#[async_trait]
24impl<T, R, F> Operator<T, R> for MapOperator<T, R, F>
25where
26    T: Clone + Send + Sync + 'static,
27    R: Clone + Send + Sync + 'static,
28    F: Fn(T) -> R + Send + Sync,
29{
30    async fn init(&mut self) -> StreamResult<()> {
31        Ok(())
32    }
33
34    async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<R>>> {
35        let result = (self.f)(record.data);
36        Ok(vec![Record::with_timestamp(result, record.timestamp)])
37    }
38
39    async fn close(&mut self) -> StreamResult<()> {
40        Ok(())
41    }
42}