fluxus_api/operators/
map.rs

1use async_trait::async_trait;
2use fluxus_core::{Operator, Record, StreamResult};
3use std::marker::PhantomData;
4
5pub struct MapOperator<T, R, F> {
6    f: F,
7    _phantom: PhantomData<(T, R)>,
8}
9
10impl<T, R, F> MapOperator<T, R, F>
11where
12    F: Fn(T) -> R,
13{
14    pub fn new(f: F) -> Self {
15        Self {
16            f,
17            _phantom: PhantomData,
18        }
19    }
20}
21
22#[async_trait]
23impl<T, R, F> Operator<T, R> for MapOperator<T, R, F>
24where
25    T: Clone + Send + Sync + 'static,
26    R: Clone + Send + Sync + 'static,
27    F: Fn(T) -> R + Send + Sync,
28{
29    async fn init(&mut self) -> StreamResult<()> {
30        Ok(())
31    }
32
33    async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<R>>> {
34        let result = (self.f)(record.data);
35        Ok(vec![Record::with_timestamp(result, record.timestamp)])
36    }
37
38    async fn close(&mut self) -> StreamResult<()> {
39        Ok(())
40    }
41}