Skip to main content

acts_next/
engine.rs

1use crate::{
2    ChannelOptions, Signal,
3    config::Config,
4    export::{Channel, Executor, Extender},
5    package,
6    scheduler::Runtime,
7};
8
9use std::sync::Arc;
10use tracing::info;
11
12/// Workflow Engine
13///
14/// ## Example:
15/// a example to caculate the result from 1 to given input value
16///
17///```rust,no_run
18/// use acts::{Engine, Workflow, Vars};
19///
20/// #[tokio::main]
21/// async fn main() {
22///     let engine = Engine::new().start();
23///
24///     let model = include_str!("../../examples/simple/model.yml");
25///     let workflow = Workflow::from_yml(model).unwrap();
26///     
27///     engine.channel().on_complete(|e| {
28///         println!("{:?}", e.outputs);
29///     });
30///     let exec = engine.executor();
31///     exec.model().deploy(&workflow).expect("fail to deploy workflow");
32///
33///     let mut vars = Vars::new();
34///     vars.insert("input".into(), 3.into());
35///     vars.insert("pid".into(), "test1".into());
36///     exec.proc().start(
37///        &workflow.id,
38///        &vars);
39/// }
40/// ```
41#[derive(Clone)]
42pub struct Engine {
43    runtime: Arc<Runtime>,
44    extender: Arc<Extender>,
45}
46
47impl Default for Engine {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl Engine {
54    pub fn new() -> Self {
55        Self::new_with_config(&Config::default())
56    }
57
58    pub fn config(&self) -> Arc<Config> {
59        self.runtime.config().clone()
60    }
61
62    /// engine executor
63    pub fn executor(&self) -> Arc<Executor> {
64        Arc::new(Executor::new(&self.runtime))
65    }
66
67    /// event channel (default to not support re-send)
68    pub fn channel(&self) -> Arc<Channel> {
69        Arc::new(Channel::new(&self.runtime))
70    }
71
72    /// create named channel to receive messages
73    /// if setting the emit_id by [`ChannelOptions`] it will check the status and re-send when not acking
74    /// # Example
75    /// ```no_run
76    /// use acts::{ Engine, ChannelOptions };
77    ///
78    /// let engine = Engine::new().start();
79    /// let chan = engine.channel_with_options(&ChannelOptions {  
80    ///     id: "chan1".to_string(),  
81    ///     ack: true,  
82    ///     r#type: "step".to_string(),
83    ///     key: "my_key*".to_string(),
84    ///     state: "{created, completed}".to_string(),
85    ///     uses: "my_package".to_string(),
86    ///     tag: "*".to_string()  
87    /// });
88    /// chan.on_message(|e| {
89    ///     // do something
90    /// });
91    /// ```
92    pub fn channel_with_options(&self, matcher: &ChannelOptions) -> Arc<Channel> {
93        Arc::new(Channel::channel(&self.runtime, matcher))
94    }
95
96    /// engine extender
97    pub fn extender(&self) -> Arc<Extender> {
98        self.extender.clone()
99    }
100
101    pub(crate) fn runtime(&self) -> Arc<Runtime> {
102        self.runtime.clone()
103    }
104
105    /// close engine
106    ///
107    /// ## Example
108    ///
109    /// ```rust,no_run
110    /// use acts::{Engine, Workflow, Vars};
111    /// #[tokio::main]
112    /// async fn main() {
113    ///     let engine = Engine::new().start();
114    ///     engine.close();
115    /// }
116    /// ```
117    pub fn close(&self) {
118        info!("close");
119        self.runtime.scher().close();
120    }
121
122    pub fn signal<T: Clone>(&self, init: T) -> Signal<T> {
123        Signal::new(init)
124    }
125
126    pub fn is_running(&self) -> bool {
127        self.runtime.is_running()
128    }
129
130    fn init(&self) {
131        info!("init");
132        self.runtime.init(self);
133        package::init(self);
134    }
135
136    pub(crate) fn new_with_config(config: &Config) -> Self {
137        info!("config: {:?}", config);
138        let runtime = Runtime::new(config);
139
140        let extender = Arc::new(Extender::new(&runtime));
141        Self { runtime, extender }
142    }
143
144    pub fn start(self) -> Self {
145        self.init();
146        self
147    }
148}