arkflow_core/stream/
mod.rs1use crate::input::Ack;
6use crate::{input::Input, output::Output, pipeline::Pipeline, Error, MessageBatch};
7use flume::Sender;
8use std::sync::Arc;
9use tokio::signal::unix::{signal, SignalKind};
10use tracing::{debug, error, info};
11use waitgroup::{WaitGroup, Worker};
12
13pub struct Stream {
15 input: Arc<dyn Input>,
16 pipeline: Arc<Pipeline>,
17 output: Arc<dyn Output>,
18 thread_num: u32,
19}
20
21impl Stream {
22 pub fn new(
24 input: Arc<dyn Input>,
25 pipeline: Pipeline,
26 output: Arc<dyn Output>,
27 thread_num: u32,
28 ) -> Self {
29 Self {
30 input,
31 pipeline: Arc::new(pipeline),
32 output,
33 thread_num,
34 }
35 }
36
37 pub async fn run(&mut self) -> Result<(), Error> {
39 self.input.connect().await?;
41 self.output.connect().await?;
42
43 let (input_sender, input_receiver) =
44 flume::bounded::<(MessageBatch, Arc<dyn Ack>)>(self.thread_num as usize * 4);
45 let (output_sender, output_receiver) =
46 flume::bounded::<(Vec<MessageBatch>, Arc<dyn Ack>)>(self.thread_num as usize * 4);
47 let input = Arc::clone(&self.input);
48
49 let wg = WaitGroup::new();
50
51 let worker = wg.worker();
52 tokio::spawn(Self::do_input(input, input_sender, worker));
53
54 for i in 0..self.thread_num {
55 let pipeline = self.pipeline.clone();
56 let input_receiver = input_receiver.clone();
57 let output_sender = output_sender.clone();
58 let worker = wg.worker();
59 tokio::spawn(async move {
60 let _worker = worker;
61 let i = i + 1;
62 info!("Worker {} started", i);
63 loop {
64 match input_receiver.recv_async().await {
65 Ok((msg, ack)) => {
66 debug!("Processing input message: {:?}", &msg.as_string());
68 let processed = pipeline.process(msg).await;
69
70 match processed {
72 Ok(msgs) => {
73 for x in &msgs {
74 debug!("Processing output message: {:?}", x.as_string());
75 }
76
77 if let Err(e) = output_sender.send_async((msgs, ack)).await {
78 error!("Failed to send processed message: {}", e);
79 break;
80 }
81 }
82 Err(e) => {
83 error!("{}", e)
84 }
85 }
86 }
87 Err(_e) => {
88 break;
89 }
90 }
91 }
92 info!("Worker {} stopped", i);
93 });
94 }
95
96 drop(output_sender);
97 loop {
98 match output_receiver.recv_async().await {
99 Ok(msg) => {
100 let size = &msg.0.len();
101 let mut success_cnt = 0;
102 for x in &msg.0 {
103 match self.output.write(x).await {
104 Ok(_) => {
105 success_cnt = success_cnt + 1;
106 }
107 Err(e) => {
108 error!("{}", e);
109 }
110 }
111 }
112
113 if *size == success_cnt {
115 msg.1.ack().await;
116 }
117 }
118 Err(_) => {
119 break;
120 }
121 }
122 }
123
124 wg.wait();
125
126 info!("Closing....");
127 self.close().await?;
128 info!("close.");
129
130 Ok(())
131 }
132
133 async fn do_input(
134 input: Arc<dyn Input>,
135 input_sender: Sender<(MessageBatch, Arc<dyn Ack>)>,
136 _worker: Worker,
137 ) {
138 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");
140 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");
141
142 loop {
143 tokio::select! {
144 _ = sigint.recv() => {
145 info!("Received SIGINT, exiting...");
146 break;
147 },
148 _ = sigterm.recv() => {
149 info!("Received SIGTERM, exiting...");
150 break;
151 },
152 result = input.read() =>{
153 match result {
154 Ok(msg) => {
155 debug!("Received input message: {:?}", &msg.0.as_string());
156 if let Err(e) = input_sender.send_async(msg).await {
157 error!("Failed to send input message: {}", e);
158 break;
159 }
160 }
161 Err(e) => {
162 match e {
163 Error::EOF => {
164 return;
166 }
167 Error::Disconnection => loop {
168 match input.connect().await {
169 Ok(_) => {
170 info!("input reconnected");
171 break;
172 }
173 Err(e) => {
174 error!("{}", e);
175 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
176 }
177 };
178 },
179 Error::Config(e) => {
180 error!("{}", e);
181 break;
182 }
183 _ => {
184 error!("{}", e);
185 }
186 };
187 }
188 };
189 }
190 };
191 }
192 info!("input stopped");
193 }
194
195 pub async fn close(&mut self) -> Result<(), Error> {
196 self.input.close().await?;
198 self.pipeline.close().await?;
199 self.output.close().await?;
200 Ok(())
201 }
202}
203
204#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
206pub struct StreamConfig {
207 pub input: crate::input::InputConfig,
208 pub pipeline: crate::pipeline::PipelineConfig,
209 pub output: crate::output::OutputConfig,
210}
211
212impl StreamConfig {
213 pub fn build(&self) -> Result<Stream, Error> {
215 let input = self.input.build()?;
216 let (pipeline, thread_num) = self.pipeline.build()?;
217 let output = self.output.build()?;
218
219 Ok(Stream::new(input, pipeline, output, thread_num))
220 }
221}