pipebase/stream/
runtime.rs1use async_trait::async_trait;
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::mpsc::{channel, error::SendError, Sender};
5use tokio::task::JoinHandle;
6use tracing::{error, info};
7
8use super::Stream;
9use crate::common::{
10 filter_senders_by_indices, replicate, send_pipe_error, senders_as_map, spawn_send,
11 wait_join_handles, ConfigInto, Context, HasContext, Pipe, PipeChannels, PipeError, Result,
12 State, SubscribeError,
13};
14
15pub struct Streamer<'a> {
16 name: &'a str,
17 context: Arc<Context>,
18 etx: Option<Sender<PipeError>>,
19}
20
21#[async_trait]
29impl<'a, T, U, S, C> Pipe<T, U, S, C> for Streamer<'a>
30where
31 T: Send + 'static,
32 U: Clone + Send + 'static,
33 S: Stream<T, U, C> + 'static,
34 C: ConfigInto<S> + Send + Sync + 'static,
35{
36 async fn run(self, config: C, channels: PipeChannels<T, U>) -> Result<()> {
37 let name = self.name;
38 let context = self.context;
39 let etx = self.etx;
40 let (mut rx, txs) = channels.into_channels();
41 assert!(rx.is_some(), "streamer '{}' has no upstreams", name);
42 assert!(!txs.is_empty(), "streamer '{}' has no downstreams", name);
43 let (tx0, mut rx0) = channel::<U>(1024);
44 let mut streamer = config.config_into().await?;
45 streamer.set_sender(tx0);
46 let pipe_name = name.to_owned();
47 let join_stream = tokio::spawn(async move {
48 let rx = rx.as_mut().unwrap();
49 info!(
50 name = pipe_name.as_str(),
51 ty = "streamer",
52 thread = "stream",
53 "run ..."
54 );
55 loop {
56 context.set_state(State::Receive);
57 let t = match (*rx).recv().await {
58 Some(t) => t,
59 None => break,
60 };
61 context.set_state(State::Send);
62 match streamer.stream(t).await {
63 Ok(_) => (),
64 Err(err) => {
65 error!(
66 name = pipe_name.as_str(),
67 ty = "streamer",
68 thread = "stream",
69 "error '{:#?}'",
70 err
71 );
72 send_pipe_error(etx.as_ref(), PipeError::new(pipe_name.clone(), err)).await;
73 context.inc_failure_run();
74 }
75 }
76 context.inc_total_run();
77 }
78 info!(
79 name = pipe_name.as_str(),
80 ty = "streamer",
81 thread = "stream",
82 "exit ..."
83 );
84 context.set_state(State::Done);
85 });
86 let mut txs = senders_as_map(txs);
87 let pipe_name = name.to_owned();
88 let join_send = tokio::spawn(async move {
90 info!(
91 name = pipe_name.as_str(),
92 ty = "streamer",
93 thread = "send",
94 "run ..."
95 );
96 loop {
97 match txs.is_empty() {
99 true => {
100 break;
101 }
102 false => (),
103 }
104 let u = match rx0.recv().await {
105 Some(u) => u,
106 None => {
107 break;
108 }
109 };
110 let mut u_replicas = replicate(u, txs.len());
111 let jhs: HashMap<usize, JoinHandle<core::result::Result<(), SendError<U>>>> = txs
112 .iter()
113 .map(|(idx, tx)| {
114 (
115 idx.to_owned(),
116 spawn_send(tx.to_owned(), u_replicas.pop().expect("no replica left")),
117 )
118 })
119 .collect();
120 assert!(u_replicas.is_empty(), "replica leftover");
121 let drop_sender_indices = wait_join_handles(jhs).await;
122 filter_senders_by_indices(&mut txs, drop_sender_indices);
123 }
124 info!(
125 name = pipe_name.as_str(),
126 ty = "streamer",
127 thread = "send",
128 "exit ..."
129 );
130 });
131 let pipe_name = name.to_owned();
133 match tokio::spawn(async move { tokio::join!(join_stream, join_send) }).await {
134 Ok(_) => (),
135 Err(err) => {
136 error!(
137 name = pipe_name.as_str(),
138 ty = "streamer",
139 thread = "join",
140 "join error '{:#?}'",
141 err
142 )
143 }
144 }
145 Ok(())
146 }
147}
148
149impl<'a> HasContext for Streamer<'a> {
150 fn get_name(&self) -> String {
151 self.name.to_owned()
152 }
153
154 fn get_context(&self) -> Arc<Context> {
155 self.context.clone()
156 }
157}
158
159impl<'a> SubscribeError for Streamer<'a> {
160 fn subscribe_error(&mut self, tx: Sender<crate::common::PipeError>) {
161 self.etx = Some(tx)
162 }
163}
164
165impl<'a> Streamer<'a> {
166 pub fn new(name: &'a str) -> Self {
167 Streamer {
168 name,
169 context: Default::default(),
170 etx: None,
171 }
172 }
173}
174
175#[macro_export]
176macro_rules! streamer {
177 (
178 $name:expr
179 ) => {{
180 Streamer::new($name)
181 }};
182}