acts_next/engine.rs
1use crate::{
2 ChannelOptions, Signal,
3 config::Config,
4 export::{Channel, Executor, Extender},
5 package,
6 scheduler::Runtime,
7};
8
9use std::sync::Arc;
10use tracing::info;
11
12/// Workflow Engine
13///
14/// ## Example:
15/// a example to caculate the result from 1 to given input value
16///
17///```rust,no_run
18/// use acts::{Engine, Workflow, Vars};
19///
20/// #[tokio::main]
21/// async fn main() {
22/// let engine = Engine::new().start();
23///
24/// let model = include_str!("../../examples/simple/model.yml");
25/// let workflow = Workflow::from_yml(model).unwrap();
26///
27/// engine.channel().on_complete(|e| {
28/// println!("{:?}", e.outputs);
29/// });
30/// let exec = engine.executor();
31/// exec.model().deploy(&workflow).expect("fail to deploy workflow");
32///
33/// let mut vars = Vars::new();
34/// vars.insert("input".into(), 3.into());
35/// vars.insert("pid".into(), "test1".into());
36/// exec.proc().start(
37/// &workflow.id,
38/// &vars);
39/// }
40/// ```
41#[derive(Clone)]
42pub struct Engine {
43 runtime: Arc<Runtime>,
44 extender: Arc<Extender>,
45}
46
47impl Default for Engine {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl Engine {
54 pub fn new() -> Self {
55 Self::new_with_config(&Config::default())
56 }
57
58 pub fn config(&self) -> Arc<Config> {
59 self.runtime.config().clone()
60 }
61
62 /// engine executor
63 pub fn executor(&self) -> Arc<Executor> {
64 Arc::new(Executor::new(&self.runtime))
65 }
66
67 /// event channel (default to not support re-send)
68 pub fn channel(&self) -> Arc<Channel> {
69 Arc::new(Channel::new(&self.runtime))
70 }
71
72 /// create named channel to receive messages
73 /// if setting the emit_id by [`ChannelOptions`] it will check the status and re-send when not acking
74 /// # Example
75 /// ```no_run
76 /// use acts::{ Engine, ChannelOptions };
77 ///
78 /// let engine = Engine::new().start();
79 /// let chan = engine.channel_with_options(&ChannelOptions {
80 /// id: "chan1".to_string(),
81 /// ack: true,
82 /// r#type: "step".to_string(),
83 /// key: "my_key*".to_string(),
84 /// state: "{created, completed}".to_string(),
85 /// uses: "my_package".to_string(),
86 /// tag: "*".to_string()
87 /// });
88 /// chan.on_message(|e| {
89 /// // do something
90 /// });
91 /// ```
92 pub fn channel_with_options(&self, matcher: &ChannelOptions) -> Arc<Channel> {
93 Arc::new(Channel::channel(&self.runtime, matcher))
94 }
95
96 /// engine extender
97 pub fn extender(&self) -> Arc<Extender> {
98 self.extender.clone()
99 }
100
101 pub(crate) fn runtime(&self) -> Arc<Runtime> {
102 self.runtime.clone()
103 }
104
105 /// close engine
106 ///
107 /// ## Example
108 ///
109 /// ```rust,no_run
110 /// use acts::{Engine, Workflow, Vars};
111 /// #[tokio::main]
112 /// async fn main() {
113 /// let engine = Engine::new().start();
114 /// engine.close();
115 /// }
116 /// ```
117 pub fn close(&self) {
118 info!("close");
119 self.runtime.scher().close();
120 }
121
122 pub fn signal<T: Clone>(&self, init: T) -> Signal<T> {
123 Signal::new(init)
124 }
125
126 pub fn is_running(&self) -> bool {
127 self.runtime.is_running()
128 }
129
130 fn init(&self) {
131 info!("init");
132 self.runtime.init(self);
133 package::init(self);
134 }
135
136 pub(crate) fn new_with_config(config: &Config) -> Self {
137 info!("config: {:?}", config);
138 let runtime = Runtime::new(config);
139
140 let extender = Arc::new(Extender::new(&runtime));
141 Self { runtime, extender }
142 }
143
144 pub fn start(self) -> Self {
145 self.init();
146 self
147 }
148}