acts_next/
engine.rs

1use crate::{
2    adapter::{self, Adapter},
3    config::Config,
4    export::{Channel, Executor, Extender},
5    plugin,
6    scheduler::Runtime,
7    ActPlugin, ChannelOptions, Signal, StoreAdapter,
8};
9
10use std::sync::{Arc, Mutex};
11use tracing::info;
12
13/// Workflow Engine
14///
15/// ## Example:
16/// a example to caculate the result from 1 to given input value
17///
18///```rust,no_run
19/// use acts::{Engine, Workflow, Vars};
20///
21/// #[tokio::main]
22/// async fn main() {
23///     let engine = Engine::new();
24///
25///     let model = include_str!("../examples/simple/model.yml");
26///     let workflow = Workflow::from_yml(model).unwrap();
27///     
28///     engine.channel().on_complete(|e| {
29///         println!("{:?}", e.outputs);
30///     });
31///     let exec = engine.executor();
32///     exec.model().deploy(&workflow).expect("fail to deploy workflow");
33///
34///     let mut vars = Vars::new();
35///     vars.insert("input".into(), 3.into());
36///     vars.insert("pid".into(), "test1".into());
37///     exec.proc().start(
38///        &workflow.id,
39///        &vars);
40/// }
41/// ```
42#[derive(Clone)]
43pub struct Engine {
44    runtime: Arc<Runtime>,
45    adapter: Arc<Adapter>,
46    extender: Arc<Extender>,
47}
48
49impl Default for Engine {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl Engine {
56    pub fn new() -> Self {
57        Self::new_with_config(&Config::default(), None)
58    }
59
60    pub fn config(&self) -> Arc<Config> {
61        self.runtime.config().clone()
62    }
63
64    /// engine executor
65    pub fn adapter(&self) -> Arc<Adapter> {
66        self.adapter.clone()
67    }
68
69    /// engine executor
70    pub fn executor(&self) -> Arc<Executor> {
71        Arc::new(Executor::new(&self.runtime))
72    }
73
74    /// event channel (default to not support re-send)
75    pub fn channel(&self) -> Arc<Channel> {
76        Arc::new(Channel::new(&self.runtime))
77    }
78
79    /// create named channel to receive messages
80    /// if setting the emit_id by [`ChannelOptions`] it will check the status and re-send when not acking
81    /// # Example
82    /// ```no_run
83    /// use acts::{ Engine, ChannelOptions };
84    ///
85    /// let engine = Engine::new();
86    /// let chan = engine.channel_with_options(&ChannelOptions {  id: "chan1".to_string(),  ack: true,  r#type: "step".to_string(), key: "my_key*".to_string(), state: "{created, completed}".to_string(), tag: "*".to_string()  });
87    /// chan.on_message(|e| {
88    ///     // do something
89    /// });
90    /// ```
91    pub fn channel_with_options(&self, matcher: &ChannelOptions) -> Arc<Channel> {
92        Arc::new(Channel::channel(&self.runtime, matcher))
93    }
94
95    /// engine extender
96    pub fn extender(&self) -> Arc<Extender> {
97        self.extender.clone()
98    }
99
100    pub(crate) fn runtime(&self) -> Arc<Runtime> {
101        self.runtime.clone()
102    }
103
104    pub(crate) fn plugins(&self) -> Arc<Mutex<Vec<Box<dyn ActPlugin>>>> {
105        self.extender.plugins()
106    }
107
108    /// close engine
109    ///
110    /// ## Example
111    ///
112    /// ```rust,no_run
113    /// use acts::{Engine, Workflow, Vars};
114    /// #[tokio::main]
115    /// async fn main() {
116    ///     let engine = Engine::new();
117    ///     engine.close();
118    /// }
119    /// ```
120    pub fn close(&self) {
121        info!("close");
122        self.runtime.scher().close();
123    }
124
125    pub fn signal<T: Clone>(&self, init: T) -> Signal<T> {
126        Signal::new(init)
127    }
128
129    pub fn is_running(&self) -> bool {
130        self.runtime.is_running()
131    }
132
133    fn init(&self) {
134        info!("init");
135        plugin::init(self);
136        adapter::init(self);
137        self.runtime.init(self);
138    }
139
140    pub(crate) fn new_with_config(config: &Config, store: Option<Arc<dyn StoreAdapter>>) -> Self {
141        info!("config: {:?}", config);
142        let runtime = Runtime::new(config);
143
144        let extender = Arc::new(Extender::new(&runtime));
145        let adapter = Arc::new(Adapter::new());
146        if let Some(store) = &store {
147            adapter.set_store(store.clone());
148        }
149        let engine = Self {
150            runtime,
151            extender,
152            adapter,
153        };
154        engine.init();
155        engine
156    }
157}