fluxus_api/operators/
map.rs1use async_trait::async_trait;
2use fluxus_transformers::Operator;
3use fluxus_utils::models::{Record, StreamResult};
4use std::marker::PhantomData;
5
6pub struct MapOperator<T, R, F> {
7 f: F,
8 _phantom: PhantomData<(T, R)>,
9}
10
11impl<T, R, F> MapOperator<T, R, F>
12where
13 F: Fn(T) -> R,
14{
15 pub fn new(f: F) -> Self {
16 Self {
17 f,
18 _phantom: PhantomData,
19 }
20 }
21}
22
23#[async_trait]
24impl<T, R, F> Operator<T, R> for MapOperator<T, R, F>
25where
26 T: Clone + Send + Sync + 'static,
27 R: Clone + Send + Sync + 'static,
28 F: Fn(T) -> R + Send + Sync,
29{
30 async fn init(&mut self) -> StreamResult<()> {
31 Ok(())
32 }
33
34 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<R>>> {
35 let result = (self.f)(record.data);
36 Ok(vec![Record::with_timestamp(result, record.timestamp)])
37 }
38
39 async fn close(&mut self) -> StreamResult<()> {
40 Ok(())
41 }
42}