arkflow_core/engine/
mod.rs

1use crate::config::EngineConfig;
2use std::process;
3use tracing::{error, info};
4
5pub struct Engine {
6    config: EngineConfig,
7}
8impl Engine {
9    /// Create a new engine
10    pub fn new(config: EngineConfig) -> Self {
11        Self { config }
12    }
13    /// Run the engine
14    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
15        // Create and run all flows
16        let mut streams = Vec::new();
17        let mut handles = Vec::new();
18
19        for (i, stream_config) in self.config.streams.iter().enumerate() {
20            info!("Initializing flow #{}", i + 1);
21
22            match stream_config.build() {
23                Ok(stream) => {
24                    streams.push(stream);
25                }
26                Err(e) => {
27                    error!("Initializing flow #{} error: {}", i + 1, e);
28                    process::exit(1);
29                }
30            }
31        }
32
33        for (i, mut stream) in streams.into_iter().enumerate() {
34            info!("Starting flow #{}", i + 1);
35
36            let handle = tokio::spawn(async move {
37                match stream.run().await {
38                    Ok(_) => info!("Flow #{} completed successfully", i + 1),
39                    Err(e) => {
40                        error!("Flow #{} ran with error: {}", i + 1, e)
41                    }
42                }
43            });
44
45            handles.push(handle);
46        }
47
48        // Wait for all flows to complete
49        for handle in handles {
50            handle.await?;
51        }
52
53        info!("All flow tasks have been complete");
54        Ok(())
55    }
56}