pipebase/stream/
runtime.rs

1use async_trait::async_trait;
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::mpsc::{channel, error::SendError, Sender};
5use tokio::task::JoinHandle;
6use tracing::{error, info};
7
8use super::Stream;
9use crate::common::{
10    filter_senders_by_indices, replicate, send_pipe_error, senders_as_map, spawn_send,
11    wait_join_handles, ConfigInto, Context, HasContext, Pipe, PipeChannels, PipeError, Result,
12    State, SubscribeError,
13};
14
15pub struct Streamer<'a> {
16    name: &'a str,
17    context: Arc<Context>,
18    etx: Option<Sender<PipeError>>,
19}
20
21/// Spawn two tasks
22/// * Run streamer
23/// * Receive data from streamer and send downstreams
24/// # Parameters
25/// * T: input
26/// * U: output
27/// * S: streamer
28#[async_trait]
29impl<'a, T, U, S, C> Pipe<T, U, S, C> for Streamer<'a>
30where
31    T: Send + 'static,
32    U: Clone + Send + 'static,
33    S: Stream<T, U, C> + 'static,
34    C: ConfigInto<S> + Send + Sync + 'static,
35{
36    async fn run(self, config: C, channels: PipeChannels<T, U>) -> Result<()> {
37        let name = self.name;
38        let context = self.context;
39        let etx = self.etx;
40        let (mut rx, txs) = channels.into_channels();
41        assert!(rx.is_some(), "streamer '{}' has no upstreams", name);
42        assert!(!txs.is_empty(), "streamer '{}' has no downstreams", name);
43        let (tx0, mut rx0) = channel::<U>(1024);
44        let mut streamer = config.config_into().await?;
45        streamer.set_sender(tx0);
46        let pipe_name = name.to_owned();
47        let join_stream = tokio::spawn(async move {
48            let rx = rx.as_mut().unwrap();
49            info!(
50                name = pipe_name.as_str(),
51                ty = "streamer",
52                thread = "stream",
53                "run ..."
54            );
55            loop {
56                context.set_state(State::Receive);
57                let t = match (*rx).recv().await {
58                    Some(t) => t,
59                    None => break,
60                };
61                context.set_state(State::Send);
62                match streamer.stream(t).await {
63                    Ok(_) => (),
64                    Err(err) => {
65                        error!(
66                            name = pipe_name.as_str(),
67                            ty = "streamer",
68                            thread = "stream",
69                            "error '{:#?}'",
70                            err
71                        );
72                        send_pipe_error(etx.as_ref(), PipeError::new(pipe_name.clone(), err)).await;
73                        context.inc_failure_run();
74                    }
75                }
76                context.inc_total_run();
77            }
78            info!(
79                name = pipe_name.as_str(),
80                ty = "streamer",
81                thread = "stream",
82                "exit ..."
83            );
84            context.set_state(State::Done);
85        });
86        let mut txs = senders_as_map(txs);
87        let pipe_name = name.to_owned();
88        // start send
89        let join_send = tokio::spawn(async move {
90            info!(
91                name = pipe_name.as_str(),
92                ty = "streamer",
93                thread = "send",
94                "run ..."
95            );
96            loop {
97                // if all receiver dropped, sender drop as well
98                match txs.is_empty() {
99                    true => {
100                        break;
101                    }
102                    false => (),
103                }
104                let u = match rx0.recv().await {
105                    Some(u) => u,
106                    None => {
107                        break;
108                    }
109                };
110                let mut u_replicas = replicate(u, txs.len());
111                let jhs: HashMap<usize, JoinHandle<core::result::Result<(), SendError<U>>>> = txs
112                    .iter()
113                    .map(|(idx, tx)| {
114                        (
115                            idx.to_owned(),
116                            spawn_send(tx.to_owned(), u_replicas.pop().expect("no replica left")),
117                        )
118                    })
119                    .collect();
120                assert!(u_replicas.is_empty(), "replica leftover");
121                let drop_sender_indices = wait_join_handles(jhs).await;
122                filter_senders_by_indices(&mut txs, drop_sender_indices);
123            }
124            info!(
125                name = pipe_name.as_str(),
126                ty = "streamer",
127                thread = "send",
128                "exit ..."
129            );
130        });
131        // join stream and send
132        let pipe_name = name.to_owned();
133        match tokio::spawn(async move { tokio::join!(join_stream, join_send) }).await {
134            Ok(_) => (),
135            Err(err) => {
136                error!(
137                    name = pipe_name.as_str(),
138                    ty = "streamer",
139                    thread = "join",
140                    "join error '{:#?}'",
141                    err
142                )
143            }
144        }
145        Ok(())
146    }
147}
148
149impl<'a> HasContext for Streamer<'a> {
150    fn get_name(&self) -> String {
151        self.name.to_owned()
152    }
153
154    fn get_context(&self) -> Arc<Context> {
155        self.context.clone()
156    }
157}
158
159impl<'a> SubscribeError for Streamer<'a> {
160    fn subscribe_error(&mut self, tx: Sender<crate::common::PipeError>) {
161        self.etx = Some(tx)
162    }
163}
164
165impl<'a> Streamer<'a> {
166    pub fn new(name: &'a str) -> Self {
167        Streamer {
168            name,
169            context: Default::default(),
170            etx: None,
171        }
172    }
173}
174
175#[macro_export]
176macro_rules! streamer {
177    (
178        $name:expr
179    ) => {{
180        Streamer::new($name)
181    }};
182}