acts_next/engine.rs
1use crate::{
2 adapter::{self, Adapter},
3 config::Config,
4 export::{Channel, Executor, Extender},
5 plugin,
6 scheduler::Runtime,
7 ActPlugin, ChannelOptions, Signal, StoreAdapter,
8};
9
10use std::sync::{Arc, Mutex};
11use tracing::info;
12
13/// Workflow Engine
14///
15/// ## Example:
16/// a example to caculate the result from 1 to given input value
17///
18///```rust,no_run
19/// use acts::{Engine, Workflow, Vars};
20///
21/// #[tokio::main]
22/// async fn main() {
23/// let engine = Engine::new();
24///
25/// let model = include_str!("../examples/simple/model.yml");
26/// let workflow = Workflow::from_yml(model).unwrap();
27///
28/// engine.channel().on_complete(|e| {
29/// println!("{:?}", e.outputs);
30/// });
31/// let exec = engine.executor();
32/// exec.model().deploy(&workflow).expect("fail to deploy workflow");
33///
34/// let mut vars = Vars::new();
35/// vars.insert("input".into(), 3.into());
36/// vars.insert("pid".into(), "test1".into());
37/// exec.proc().start(
38/// &workflow.id,
39/// &vars);
40/// }
41/// ```
42#[derive(Clone)]
43pub struct Engine {
44 runtime: Arc<Runtime>,
45 adapter: Arc<Adapter>,
46 extender: Arc<Extender>,
47}
48
49impl Default for Engine {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl Engine {
56 pub fn new() -> Self {
57 Self::new_with_config(&Config::default(), None)
58 }
59
60 pub fn config(&self) -> Arc<Config> {
61 self.runtime.config().clone()
62 }
63
64 /// engine executor
65 pub fn adapter(&self) -> Arc<Adapter> {
66 self.adapter.clone()
67 }
68
69 /// engine executor
70 pub fn executor(&self) -> Arc<Executor> {
71 Arc::new(Executor::new(&self.runtime))
72 }
73
74 /// event channel (default to not support re-send)
75 pub fn channel(&self) -> Arc<Channel> {
76 Arc::new(Channel::new(&self.runtime))
77 }
78
79 /// create named channel to receive messages
80 /// if setting the emit_id by [`ChannelOptions`] it will check the status and re-send when not acking
81 /// # Example
82 /// ```no_run
83 /// use acts::{ Engine, ChannelOptions };
84 ///
85 /// let engine = Engine::new();
86 /// let chan = engine.channel_with_options(&ChannelOptions { id: "chan1".to_string(), ack: true, r#type: "step".to_string(), key: "my_key*".to_string(), state: "{created, completed}".to_string(), tag: "*".to_string() });
87 /// chan.on_message(|e| {
88 /// // do something
89 /// });
90 /// ```
91 pub fn channel_with_options(&self, matcher: &ChannelOptions) -> Arc<Channel> {
92 Arc::new(Channel::channel(&self.runtime, matcher))
93 }
94
95 /// engine extender
96 pub fn extender(&self) -> Arc<Extender> {
97 self.extender.clone()
98 }
99
100 pub(crate) fn runtime(&self) -> Arc<Runtime> {
101 self.runtime.clone()
102 }
103
104 pub(crate) fn plugins(&self) -> Arc<Mutex<Vec<Box<dyn ActPlugin>>>> {
105 self.extender.plugins()
106 }
107
108 /// close engine
109 ///
110 /// ## Example
111 ///
112 /// ```rust,no_run
113 /// use acts::{Engine, Workflow, Vars};
114 /// #[tokio::main]
115 /// async fn main() {
116 /// let engine = Engine::new();
117 /// engine.close();
118 /// }
119 /// ```
120 pub fn close(&self) {
121 info!("close");
122 self.runtime.scher().close();
123 }
124
125 pub fn signal<T: Clone>(&self, init: T) -> Signal<T> {
126 Signal::new(init)
127 }
128
129 pub fn is_running(&self) -> bool {
130 self.runtime.is_running()
131 }
132
133 fn init(&self) {
134 info!("init");
135 plugin::init(self);
136 adapter::init(self);
137 self.runtime.init(self);
138 }
139
140 pub(crate) fn new_with_config(config: &Config, store: Option<Arc<dyn StoreAdapter>>) -> Self {
141 info!("config: {:?}", config);
142 let runtime = Runtime::new(config);
143
144 let extender = Arc::new(Extender::new(&runtime));
145 let adapter = Arc::new(Adapter::new());
146 if let Some(store) = &store {
147 adapter.set_store(store.clone());
148 }
149 let engine = Self {
150 runtime,
151 extender,
152 adapter,
153 };
154 engine.init();
155 engine
156 }
157}