arkflow_core/engine/
mod.rs1use crate::config::EngineConfig;
2use std::process;
3use tracing::{error, info};
4
5pub struct Engine {
6 config: EngineConfig,
7}
8impl Engine {
9 pub fn new(config: EngineConfig) -> Self {
11 Self { config }
12 }
13 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
15 let mut streams = Vec::new();
17 let mut handles = Vec::new();
18
19 for (i, stream_config) in self.config.streams.iter().enumerate() {
20 info!("Initializing flow #{}", i + 1);
21
22 match stream_config.build() {
23 Ok(stream) => {
24 streams.push(stream);
25 }
26 Err(e) => {
27 error!("Initializing flow #{} error: {}", i + 1, e);
28 process::exit(1);
29 }
30 }
31 }
32
33 for (i, mut stream) in streams.into_iter().enumerate() {
34 info!("Starting flow #{}", i + 1);
35
36 let handle = tokio::spawn(async move {
37 match stream.run().await {
38 Ok(_) => info!("Flow #{} completed successfully", i + 1),
39 Err(e) => {
40 error!("Flow #{} ran with error: {}", i + 1, e)
41 }
42 }
43 });
44
45 handles.push(handle);
46 }
47
48 for handle in handles {
50 handle.await?;
51 }
52
53 info!("All flow tasks have been complete");
54 Ok(())
55 }
56}