pipebase/map/
runtime.rs

1use std::fmt::Debug;
2
3use async_trait::async_trait;
4use tokio::sync::mpsc::{error::SendError, Sender};
5use tokio::task::JoinHandle;
6use tracing::{error, info};
7
8use super::Map;
9use crate::common::{
10    filter_senders_by_indices, replicate, send_pipe_error, senders_as_map, spawn_send,
11    wait_join_handles, ConfigInto, Context, HasContext, Pipe, PipeChannels, PipeError, Result,
12    State, SubscribeError,
13};
14use std::collections::HashMap;
15use std::sync::Arc;
16
17pub struct Mapper<'a> {
18    name: &'a str,
19    context: Arc<Context>,
20    etx: Option<Sender<PipeError>>,
21}
22
23/// Start loop
24/// * Receive and map data
25/// * Send mapper's output to downstrem
26/// # Parameters
27/// * T: input
28/// * U: output
29/// * M: mapper
30#[async_trait]
31impl<'a, T, U, M, C> Pipe<T, U, M, C> for Mapper<'a>
32where
33    T: Send + Sync + 'static,
34    U: Clone + Debug + Send + 'static,
35    M: Map<T, U, C>,
36    C: ConfigInto<M> + Send + Sync + 'static,
37{
38    async fn run(self, config: C, channels: PipeChannels<T, U>) -> Result<()> {
39        let name = self.name;
40        let context = self.context;
41        let etx = self.etx;
42        let (mut rx, txs) = channels.into_channels();
43        assert!(rx.is_some(), "mapper '{}' has no upstreams", name);
44        assert!(!txs.is_empty(), "mapper '{}' has no downstreams", name);
45        let mut mapper = config.config_into().await?;
46        let mut txs = senders_as_map(txs);
47        let rx = rx.as_mut().unwrap();
48        info!(name = name, ty = "mapper", "run ...");
49        loop {
50            context.set_state(State::Receive);
51            // if all receiver dropped, sender drop as well
52            match txs.is_empty() {
53                true => {
54                    break;
55                }
56                false => (),
57            }
58            let t = rx.recv().await;
59            let t = match t {
60                Some(t) => t,
61                None => {
62                    break;
63                }
64            };
65            context.set_state(State::Map);
66            let u = match mapper.map(t).await {
67                Ok(u) => u,
68                Err(err) => {
69                    error!(name = name, ty = "mapper", "error '{:#?}'", err);
70                    context.inc_total_run();
71                    context.inc_failure_run();
72                    send_pipe_error(etx.as_ref(), PipeError::new(name.to_owned(), err)).await;
73                    continue;
74                }
75            };
76            context.set_state(State::Send);
77            let mut u_replicas = replicate(u, txs.len());
78            let jhs: HashMap<usize, JoinHandle<core::result::Result<(), SendError<U>>>> = txs
79                .iter()
80                .map(|(idx, tx)| {
81                    (
82                        idx.to_owned(),
83                        spawn_send(tx.to_owned(), u_replicas.pop().expect("no replica left")),
84                    )
85                })
86                .collect();
87            assert!(u_replicas.is_empty(), "replica leftover");
88            let drop_sender_indices = wait_join_handles(jhs).await;
89            filter_senders_by_indices(&mut txs, drop_sender_indices);
90            context.inc_total_run();
91        }
92        info!(name = name, ty = "mapper", "exit ...");
93        context.set_state(State::Done);
94        Ok(())
95    }
96}
97
98impl<'a> HasContext for Mapper<'a> {
99    fn get_name(&self) -> String {
100        self.name.to_owned()
101    }
102
103    fn get_context(&self) -> Arc<Context> {
104        self.context.clone()
105    }
106}
107
108impl<'a> Mapper<'a> {
109    pub fn new(name: &'a str) -> Self {
110        Mapper {
111            name,
112            context: Default::default(),
113            etx: None,
114        }
115    }
116}
117
118impl<'a> SubscribeError for Mapper<'a> {
119    fn subscribe_error(&mut self, tx: Sender<crate::common::PipeError>) {
120        self.etx = Some(tx)
121    }
122}
123
124#[macro_export]
125macro_rules! mapper {
126    (
127        $name:expr
128    ) => {{
129        Mapper::new($name)
130    }};
131}