pipebase/listen/
runtime.rs1use async_trait::async_trait;
2use tokio::sync::mpsc::{error::SendError, Sender};
3use tokio::task::JoinHandle;
4use tracing::{error, info};
5
6use super::Listen;
7use crate::common::{
8 filter_senders_by_indices, replicate, send_pipe_error, senders_as_map, spawn_send,
9 wait_join_handles, ConfigInto, Context, HasContext, Pipe, PipeChannels, PipeError, Result,
10 State, SubscribeError,
11};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::mpsc::channel;
15
16pub struct Listener<'a> {
17 name: &'a str,
18 context: Arc<Context>,
19 etx: Option<Sender<PipeError>>,
20}
21
22#[async_trait]
29impl<'a, U, L, C> Pipe<(), U, L, C> for Listener<'a>
30where
31 U: Clone + Send + 'static,
32 L: Listen<U, C> + 'static,
33 C: ConfigInto<L> + Send + Sync + 'static,
34{
35 async fn run(self, config: C, channels: PipeChannels<(), U>) -> Result<()> {
36 let name = self.name;
37 let context = self.context;
38 let etx = self.etx;
39 let (rx, txs) = channels.into_channels();
40 assert!(rx.is_none(), "listener '{}' has invalid upstreams", name);
41 assert!(!txs.is_empty(), "listener '{}' has no downstreams", name);
42 let (tx0, mut rx0) = channel::<U>(1024);
43 let mut listener = config.config_into().await?;
44 listener.set_sender(tx0);
45 let pipe_name = name.to_owned();
46 let join_listen = tokio::spawn(async move {
48 info!(
49 name = pipe_name.as_str(),
50 ty = "listener",
51 thread = "listen",
52 "run ..."
53 );
54 match listener.run().await {
55 Ok(_) => info!(
56 name = pipe_name.as_str(),
57 ty = "listener",
58 thread = "listen",
59 "exit ..."
60 ),
61 Err(err) => {
62 error!(
63 name = pipe_name.as_str(),
64 ty = "listener",
65 thread = "listen",
66 "exit with error '{:#?}'",
67 err
68 );
69 send_pipe_error(etx.as_ref(), PipeError::new(pipe_name, err)).await
70 }
71 };
72 });
73 let mut txs = senders_as_map(txs);
75 let pipe_name = name.to_owned();
76 let join_send = tokio::spawn(async move {
77 info!(
78 name = pipe_name.as_str(),
79 ty = "listener",
80 thread = "send",
81 "run ..."
82 );
83 loop {
84 context.set_state(State::Receive);
85 match txs.is_empty() {
87 true => {
88 break;
89 }
90 false => (),
91 }
92 let u = match rx0.recv().await {
93 Some(u) => u,
94 None => {
95 break;
96 }
97 };
98 context.set_state(State::Send);
99 let mut u_replicas = replicate(u, txs.len());
100 let jhs: HashMap<usize, JoinHandle<core::result::Result<(), SendError<U>>>> = txs
101 .iter()
102 .map(|(idx, tx)| {
103 (
104 idx.to_owned(),
105 spawn_send(tx.to_owned(), u_replicas.pop().expect("no replica left")),
106 )
107 })
108 .collect();
109 assert!(u_replicas.is_empty(), "replica leftover");
110 let drop_sender_indices = wait_join_handles(jhs).await;
111 filter_senders_by_indices(&mut txs, drop_sender_indices);
112 context.inc_total_run();
113 }
114 info!(
115 name = pipe_name.as_str(),
116 ty = "listener",
117 thread = "send",
118 "exit ..."
119 );
120 context.set_state(State::Done);
121 });
122 let pipe_name = name.to_owned();
124 match tokio::spawn(async move { tokio::join!(join_listen, join_send) }).await {
125 Ok(_) => (),
126 Err(err) => {
127 error!(
128 name = pipe_name.as_str(),
129 ty = "listener",
130 thread = "join",
131 "join error {:#?}",
132 err
133 )
134 }
135 }
136 Ok(())
137 }
138}
139
140impl<'a> HasContext for Listener<'a> {
141 fn get_name(&self) -> String {
142 self.name.to_owned()
143 }
144
145 fn get_context(&self) -> Arc<Context> {
146 self.context.clone()
147 }
148}
149
150impl<'a> Listener<'a> {
151 pub fn new(name: &'a str) -> Self {
152 Listener {
153 name,
154 context: Default::default(),
155 etx: None,
156 }
157 }
158}
159
160impl<'a> SubscribeError for Listener<'a> {
161 fn subscribe_error(&mut self, tx: Sender<crate::common::PipeError>) {
162 self.etx = Some(tx)
163 }
164}
165
166#[macro_export]
167macro_rules! listener {
168 (
169 $name:expr
170 ) => {{
171 Listener::new($name)
172 }};
173}