arkflow_core/stream/
mod.rs

1//! Stream component module
2//!
3//! A stream is a complete data processing unit, containing input, pipeline, and output.
4
5use crate::input::Ack;
6use crate::{input::Input, output::Output, pipeline::Pipeline, Error, MessageBatch};
7use flume::Sender;
8use std::sync::Arc;
9use tokio::signal::unix::{signal, SignalKind};
10use tracing::{debug, error, info};
11use waitgroup::{WaitGroup, Worker};
12
13/// A stream structure, containing input, pipe, output, and an optional buffer.
14pub struct Stream {
15    input: Arc<dyn Input>,
16    pipeline: Arc<Pipeline>,
17    output: Arc<dyn Output>,
18    thread_num: u32,
19}
20
21impl Stream {
22    /// Create a new stream.
23    pub fn new(
24        input: Arc<dyn Input>,
25        pipeline: Pipeline,
26        output: Arc<dyn Output>,
27        thread_num: u32,
28    ) -> Self {
29        Self {
30            input,
31            pipeline: Arc::new(pipeline),
32            output,
33            thread_num,
34        }
35    }
36
37    /// Running stream processing
38    pub async fn run(&mut self) -> Result<(), Error> {
39        // Connect input and output
40        self.input.connect().await?;
41        self.output.connect().await?;
42
43        let (input_sender, input_receiver) =
44            flume::bounded::<(MessageBatch, Arc<dyn Ack>)>(self.thread_num as usize * 4);
45        let (output_sender, output_receiver) =
46            flume::bounded::<(Vec<MessageBatch>, Arc<dyn Ack>)>(self.thread_num as usize * 4);
47        let input = Arc::clone(&self.input);
48
49        let wg = WaitGroup::new();
50
51        let worker = wg.worker();
52        tokio::spawn(Self::do_input(input, input_sender, worker));
53
54        for i in 0..self.thread_num {
55            let pipeline = self.pipeline.clone();
56            let input_receiver = input_receiver.clone();
57            let output_sender = output_sender.clone();
58            let worker = wg.worker();
59            tokio::spawn(async move {
60                let _worker = worker;
61                let i = i + 1;
62                info!("Worker {} started", i);
63                loop {
64                    match input_receiver.recv_async().await {
65                        Ok((msg, ack)) => {
66                            // Process messages through pipeline
67                            debug!("Processing input message: {:?}", &msg.as_string());
68                            let processed = pipeline.process(msg).await;
69
70                            // Process result messages
71                            match processed {
72                                Ok(msgs) => {
73                                    for x in &msgs {
74                                        debug!("Processing output message: {:?}", x.as_string());
75                                    }
76
77                                    if let Err(e) = output_sender.send_async((msgs, ack)).await {
78                                        error!("Failed to send processed message: {}", e);
79                                        break;
80                                    }
81                                }
82                                Err(e) => {
83                                    error!("{}", e)
84                                }
85                            }
86                        }
87                        Err(_e) => {
88                            break;
89                        }
90                    }
91                }
92                info!("Worker {} stopped", i);
93            });
94        }
95
96        drop(output_sender);
97        loop {
98            match output_receiver.recv_async().await {
99                Ok(msg) => {
100                    let size = &msg.0.len();
101                    let mut success_cnt = 0;
102                    for x in &msg.0 {
103                        match self.output.write(x).await {
104                            Ok(_) => {
105                                success_cnt = success_cnt + 1;
106                            }
107                            Err(e) => {
108                                error!("{}", e);
109                            }
110                        }
111                    }
112
113                    // Confirm that the message has been successfully processed
114                    if *size == success_cnt {
115                        msg.1.ack().await;
116                    }
117                }
118                Err(_) => {
119                    break;
120                }
121            }
122        }
123
124        wg.wait();
125
126        info!("Closing....");
127        self.close().await?;
128        info!("close.");
129
130        Ok(())
131    }
132
133    async fn do_input(
134        input: Arc<dyn Input>,
135        input_sender: Sender<(MessageBatch, Arc<dyn Ack>)>,
136        _worker: Worker,
137    ) {
138        // Set up signal handlers
139        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");
140        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");
141
142        loop {
143            tokio::select! {
144                _ = sigint.recv() => {
145                    info!("Received SIGINT, exiting...");
146                    break;
147                },
148                _ = sigterm.recv() => {
149                    info!("Received SIGTERM, exiting...");
150                    break;
151                },
152                result = input.read() =>{
153                    match result {
154                    Ok(msg) => {
155                        debug!("Received input message: {:?}", &msg.0.as_string());
156                        if let Err(e) = input_sender.send_async(msg).await {
157                            error!("Failed to send input message: {}", e);
158                            break;
159                        }
160                    }
161                    Err(e) => {
162                        match e {
163                            Error::EOF => {
164                                // When input is complete, close the sender to notify all workers
165                                return;
166                            }
167                            Error::Disconnection => loop {
168                                match input.connect().await {
169                                    Ok(_) => {
170                                        info!("input reconnected");
171                                        break;
172                                    }
173                                    Err(e) => {
174                                        error!("{}", e);
175                                        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
176                                    }
177                                };
178                            },
179                            Error::Config(e) => {
180                                error!("{}", e);
181                                break;
182                            }
183                            _ => {
184                                error!("{}", e);
185                            }
186                        };
187                    }
188                    };
189                }
190            };
191        }
192        info!("input stopped");
193    }
194
195    pub async fn close(&mut self) -> Result<(), Error> {
196        // Closing order: input -> pipeline -> buffer -> output
197        self.input.close().await?;
198        self.pipeline.close().await?;
199        self.output.close().await?;
200        Ok(())
201    }
202}
203
204/// 流配置
205#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
206pub struct StreamConfig {
207    pub input: crate::input::InputConfig,
208    pub pipeline: crate::pipeline::PipelineConfig,
209    pub output: crate::output::OutputConfig,
210}
211
212impl StreamConfig {
213    /// 根据配置构建流
214    pub fn build(&self) -> Result<Stream, Error> {
215        let input = self.input.build()?;
216        let (pipeline, thread_num) = self.pipeline.build()?;
217        let output = self.output.build()?;
218
219        Ok(Stream::new(input, pipeline, output, thread_num))
220    }
221}