fluxus_api/operators/
map.rs1use async_trait::async_trait;
2use fluxus_core::{Operator, Record, StreamResult};
3use std::marker::PhantomData;
4
5pub struct MapOperator<T, R, F> {
6 f: F,
7 _phantom: PhantomData<(T, R)>,
8}
9
10impl<T, R, F> MapOperator<T, R, F>
11where
12 F: Fn(T) -> R,
13{
14 pub fn new(f: F) -> Self {
15 Self {
16 f,
17 _phantom: PhantomData,
18 }
19 }
20}
21
22#[async_trait]
23impl<T, R, F> Operator<T, R> for MapOperator<T, R, F>
24where
25 T: Clone + Send + Sync + 'static,
26 R: Clone + Send + Sync + 'static,
27 F: Fn(T) -> R + Send + Sync,
28{
29 async fn init(&mut self) -> StreamResult<()> {
30 Ok(())
31 }
32
33 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<R>>> {
34 let result = (self.f)(record.data);
35 Ok(vec![Record::with_timestamp(result, record.timestamp)])
36 }
37
38 async fn close(&mut self) -> StreamResult<()> {
39 Ok(())
40 }
41}