pipebase/listen/
runtime.rs

1use async_trait::async_trait;
2use tokio::sync::mpsc::{error::SendError, Sender};
3use tokio::task::JoinHandle;
4use tracing::{error, info};
5
6use super::Listen;
7use crate::common::{
8    filter_senders_by_indices, replicate, send_pipe_error, senders_as_map, spawn_send,
9    wait_join_handles, ConfigInto, Context, HasContext, Pipe, PipeChannels, PipeError, Result,
10    State, SubscribeError,
11};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::mpsc::channel;
15
16pub struct Listener<'a> {
17    name: &'a str,
18    context: Arc<Context>,
19    etx: Option<Sender<PipeError>>,
20}
21
22/// Spawn two tasks
23/// * Run listener
24/// * Receive data from listener and send downstreams
25/// # Parameters
26/// * U: output
27/// * L: listener
28#[async_trait]
29impl<'a, U, L, C> Pipe<(), U, L, C> for Listener<'a>
30where
31    U: Clone + Send + 'static,
32    L: Listen<U, C> + 'static,
33    C: ConfigInto<L> + Send + Sync + 'static,
34{
35    async fn run(self, config: C, channels: PipeChannels<(), U>) -> Result<()> {
36        let name = self.name;
37        let context = self.context;
38        let etx = self.etx;
39        let (rx, txs) = channels.into_channels();
40        assert!(rx.is_none(), "listener '{}' has invalid upstreams", name);
41        assert!(!txs.is_empty(), "listener '{}' has no downstreams", name);
42        let (tx0, mut rx0) = channel::<U>(1024);
43        let mut listener = config.config_into().await?;
44        listener.set_sender(tx0);
45        let pipe_name = name.to_owned();
46        // start listen
47        let join_listen = tokio::spawn(async move {
48            info!(
49                name = pipe_name.as_str(),
50                ty = "listener",
51                thread = "listen",
52                "run ..."
53            );
54            match listener.run().await {
55                Ok(_) => info!(
56                    name = pipe_name.as_str(),
57                    ty = "listener",
58                    thread = "listen",
59                    "exit ..."
60                ),
61                Err(err) => {
62                    error!(
63                        name = pipe_name.as_str(),
64                        ty = "listener",
65                        thread = "listen",
66                        "exit with error '{:#?}'",
67                        err
68                    );
69                    send_pipe_error(etx.as_ref(), PipeError::new(pipe_name, err)).await
70                }
71            };
72        });
73        // start send
74        let mut txs = senders_as_map(txs);
75        let pipe_name = name.to_owned();
76        let join_send = tokio::spawn(async move {
77            info!(
78                name = pipe_name.as_str(),
79                ty = "listener",
80                thread = "send",
81                "run ..."
82            );
83            loop {
84                context.set_state(State::Receive);
85                // if all receiver dropped, sender drop as well
86                match txs.is_empty() {
87                    true => {
88                        break;
89                    }
90                    false => (),
91                }
92                let u = match rx0.recv().await {
93                    Some(u) => u,
94                    None => {
95                        break;
96                    }
97                };
98                context.set_state(State::Send);
99                let mut u_replicas = replicate(u, txs.len());
100                let jhs: HashMap<usize, JoinHandle<core::result::Result<(), SendError<U>>>> = txs
101                    .iter()
102                    .map(|(idx, tx)| {
103                        (
104                            idx.to_owned(),
105                            spawn_send(tx.to_owned(), u_replicas.pop().expect("no replica left")),
106                        )
107                    })
108                    .collect();
109                assert!(u_replicas.is_empty(), "replica leftover");
110                let drop_sender_indices = wait_join_handles(jhs).await;
111                filter_senders_by_indices(&mut txs, drop_sender_indices);
112                context.inc_total_run();
113            }
114            info!(
115                name = pipe_name.as_str(),
116                ty = "listener",
117                thread = "send",
118                "exit ..."
119            );
120            context.set_state(State::Done);
121        });
122        // join listen and send
123        let pipe_name = name.to_owned();
124        match tokio::spawn(async move { tokio::join!(join_listen, join_send) }).await {
125            Ok(_) => (),
126            Err(err) => {
127                error!(
128                    name = pipe_name.as_str(),
129                    ty = "listener",
130                    thread = "join",
131                    "join error {:#?}",
132                    err
133                )
134            }
135        }
136        Ok(())
137    }
138}
139
140impl<'a> HasContext for Listener<'a> {
141    fn get_name(&self) -> String {
142        self.name.to_owned()
143    }
144
145    fn get_context(&self) -> Arc<Context> {
146        self.context.clone()
147    }
148}
149
150impl<'a> Listener<'a> {
151    pub fn new(name: &'a str) -> Self {
152        Listener {
153            name,
154            context: Default::default(),
155            etx: None,
156        }
157    }
158}
159
160impl<'a> SubscribeError for Listener<'a> {
161    fn subscribe_error(&mut self, tx: Sender<crate::common::PipeError>) {
162        self.etx = Some(tx)
163    }
164}
165
166#[macro_export]
167macro_rules! listener {
168    (
169        $name:expr
170    ) => {{
171        Listener::new($name)
172    }};
173}