1use crate::loader::{Loader, load_module};
2#[cfg(target_env = "gnu")]
3use crate::memory::force_memory_release;
4use crate::settings::Settings;
5use crossbeam::channel;
6use futures::future::join_all;
7use log::{debug, error, info};
8use phlow_engine::phs::{Script, ScriptError, build_engine};
9use phlow_engine::{Context, Phlow};
10use phlow_sdk::structs::Package;
11use phlow_sdk::tokio;
12use phlow_sdk::{
13 prelude::Value,
14 structs::{ModulePackage, ModuleSetup, Modules},
15 tracing::{self, Dispatch, dispatcher},
16};
17use std::fmt::Display;
18use std::sync::Arc;
19#[cfg(target_env = "gnu")]
20use std::thread;
21use tokio::sync::oneshot;
22
23#[derive(Debug)]
24pub enum RuntimeError {
25 ModuleWithError(ScriptError),
26 ModuleRegisterError,
27 FlowExecutionError(String),
28}
29
30impl Display for RuntimeError {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 RuntimeError::ModuleRegisterError => write!(f, "Module register error"),
34 RuntimeError::FlowExecutionError(err) => write!(f, "Flow execution error: {}", err),
35 RuntimeError::ModuleWithError(err) => write!(f, "Module with error: {}", err),
36 }
37 }
38}
39
40fn parse_cli_value(flag: &str, value: &str) -> Result<Value, RuntimeError> {
41 match Value::json_to_value(value) {
42 Ok(parsed) => Ok(parsed),
43 Err(err) => {
44 error!("Failed to parse --{} value '{}': {:?}", flag, value, err);
45 Err(RuntimeError::FlowExecutionError(format!(
46 "Failed to parse --{} value: {:?}",
47 flag, err
48 )))
49 }
50 }
51}
52
53pub struct Runtime {}
54
55impl Runtime {
56 async fn load_modules(
57 loader: Loader,
58 dispatch: Dispatch,
59 settings: Settings,
60 tx_main_package: channel::Sender<Package>,
61 ) -> Result<Modules, RuntimeError> {
62 let mut modules = Modules::default();
63 let engine = build_engine(None);
64 let app_data = loader.app_data.clone();
68 let loader_main_id = loader.main.clone();
69
70 for (id, module) in loader.modules.into_iter().enumerate() {
71 let (setup_sender, setup_receive) =
72 oneshot::channel::<Option<channel::Sender<ModulePackage>>>();
73
74 let main_sender = if loader_main_id == id as i32 && settings.var_main.is_none() {
76 Some(tx_main_package.clone())
77 } else {
78 None
79 };
80
81 let with = {
82 let script = match Script::try_build(engine.clone(), &module.with) {
83 Ok(payload) => payload,
84 Err(err) => return Err(RuntimeError::ModuleWithError(err)),
85 };
86
87 let with: Value = script
88 .evaluate_without_context()
89 .map_err(|err| RuntimeError::ModuleWithError(err))?;
90
91 log::debug!(
92 "Module '{}' with: {}",
93 module.name,
94 with.to_json(phlow_sdk::prelude::JsonMode::Indented)
95 ); with
97 };
98
99 let setup = ModuleSetup {
100 id,
101 setup_sender,
102 main_sender,
103 with,
104 dispatch: dispatch.clone(),
105 app_data: app_data.clone(),
106 is_test_mode: false,
107 };
108
109 let module_target = module.module.clone();
110 let module_version = module.version.clone();
111 let local_path = module.local_path.clone();
112 let settings = settings.clone();
113
114 std::thread::spawn(move || {
115 let result: Result<(), crate::loader::error::Error> =
116 load_module(setup, &module_target, &module_version, local_path, settings);
117
118 if let Err(err) = result {
119 error!("Runtime Error Load Module: {:?}", err)
120 }
121 });
122
123 debug!(
124 "Module {} loaded with name \"{}\" and version \"{}\"",
125 module.module, module.name, module.version
126 );
127
128 match setup_receive.await {
129 Ok(Some(sender)) => {
130 debug!("Module {} registered", module.name);
131 modules.register(module, sender);
132 }
133 Ok(None) => {
134 debug!("Module {} did not register", module.name);
135 }
136 Err(_) => {
137 return Err(RuntimeError::ModuleRegisterError);
138 }
139 }
140 }
141
142 Ok(modules)
143 }
144
145 async fn listener(
146 rx_main_package: channel::Receiver<Package>,
147 steps: Value,
148 modules: Modules,
149 settings: Settings,
150 default_context: Option<Context>,
151 ) -> Result<(), RuntimeError> {
152 let phlow = Arc::new({
153 match Phlow::try_from_value(&steps, Some(Arc::new(modules))) {
154 Ok(phlow) => phlow,
155 Err(err) => return Err(RuntimeError::FlowExecutionError(err.to_string())),
156 }
157 });
158 if let Some(controller) = phlow_engine::debug::debug_controller() {
159 controller.set_script(phlow.script()).await;
160 }
161
162 let start_step = if let Some(step_id) = settings.start_step.as_deref() {
163 match phlow.find_step_reference(step_id) {
164 Some(step_ref) => Some(step_ref),
165 None => {
166 return Err(RuntimeError::FlowExecutionError(format!(
167 "Step id '{}' not found",
168 step_id
169 )));
170 }
171 }
172 } else {
173 None
174 };
175
176 drop(steps);
177
178 let mut handles = Vec::new();
179 let default_context = default_context.clone();
180
181 for _i in 0..settings.package_consumer_count {
182 let rx_main_pkg = rx_main_package.clone();
183 let phlow = phlow.clone();
184 let default_context = default_context.clone();
185 let start_step = start_step.clone();
186
187 let handle = tokio::task::spawn_blocking(move || {
188 for mut main_package in rx_main_pkg {
189 let phlow = phlow.clone();
190 let parent = match main_package.span.clone() {
191 Some(span) => span,
192 None => {
193 error!("Span not found in main module");
194 continue;
195 }
196 };
197 let dispatch = match main_package.dispatch.clone() {
198 Some(dispatch) => dispatch,
199 None => {
200 error!("Dispatch not found in main module");
201 continue;
202 }
203 };
204
205 let mut context = {
206 let data = main_package.get_data().cloned().unwrap_or(Value::Null);
207 if let Some(mut context) = default_context.clone() {
208 context.set_main(data);
209 context
210 } else {
211 Context::from_main(data)
212 }
213 };
214 let start_step = start_step.clone();
215
216 tokio::task::block_in_place(move || {
217 dispatcher::with_default(&dispatch, || {
218 let _enter = parent.enter();
219 let rt = tokio::runtime::Handle::current();
220
221 rt.block_on(async {
222 let result = if let Some(step_ref) = start_step.clone() {
223 phlow.execute_from(&mut context, step_ref).await
224 } else {
225 phlow.execute(&mut context).await
226 };
227 match result {
228 Ok(result) => {
229 let result_value = result.unwrap_or(Value::Undefined);
230 main_package.send(result_value);
231 }
232 Err(err) => {
233 error!("Runtime Error Execute Steps: {:?}", err);
234 }
235 }
236 });
237 });
238 });
239 }
240 });
241
242 handles.push(handle);
243 }
244
245 join_all(handles).await;
246
247 Ok(())
248 }
249
250 pub async fn run(
251 loader: Loader,
252 dispatch: Dispatch,
253 settings: Settings,
254 ) -> Result<(), RuntimeError> {
255 let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
259
260 let var_payload_value = match &settings.var_payload {
261 Some(var_payload_str) => Some(parse_cli_value("var-payload", var_payload_str)?),
262 None => None,
263 };
264 let default_context = var_payload_value.as_ref().map(|payload| {
265 let mut context = Context::new();
266 context.add_step_payload(Some(payload.clone()));
267 context
268 });
269
270 let no_main = loader.main == -1 || settings.var_main.is_some();
271 let steps = loader.get_steps();
272 let modules = Self::load_modules(
273 loader,
274 dispatch.clone(),
275 settings.clone(),
276 tx_main_package.clone(),
277 )
278 .await?;
279
280 if no_main {
282 let span = tracing::span!(
284 tracing::Level::INFO,
285 "auto_start_steps",
286 otel.name = "phlow auto start"
287 );
288
289 let request_data = if let Some(var_main_str) = &settings.var_main {
291 Some(parse_cli_value("var-main", var_main_str)?)
293 } else {
294 None
295 };
296
297 let package = Package {
299 response: None,
300 request_data,
301 origin: 0,
302 span: Some(span),
303 dispatch: Some(dispatch.clone()),
304 };
305
306 if let Err(err) = tx_main_package.send(package) {
307 error!("Failed to send package: {:?}", err);
308 return Err(RuntimeError::FlowExecutionError(
309 "Failed to send package".to_string(),
310 ));
311 }
312
313 if settings.var_main.is_some() {
314 info!("Using --var-main to simulate main module output");
315 }
316 }
317
318 drop(tx_main_package);
319
320 #[cfg(target_env = "gnu")]
321 if settings.garbage_collection {
322 thread::spawn(move || {
323 loop {
324 thread::sleep(std::time::Duration::from_secs(
325 settings.garbage_collection_interval,
326 ));
327 force_memory_release(settings.min_allocated_memory);
328 }
329 });
330 }
331
332 info!("Phlow!");
333
334 Self::listener(rx_main_package, steps, modules, settings, default_context)
338 .await
339 .map_err(|err| {
340 error!("Runtime Error: {:?}", err);
341 err
342 })?;
343
344 Ok(())
345 }
346
347 pub async fn run_script(
348 tx_main_package: channel::Sender<Package>,
349 rx_main_package: channel::Receiver<Package>,
350 loader: Loader,
351 dispatch: Dispatch,
352 settings: Settings,
353 context: Context,
354 ) -> Result<(), RuntimeError> {
355 let steps = loader.get_steps();
356 let context = if let Some(var_payload_str) = &settings.var_payload {
357 let payload = parse_cli_value("var-payload", var_payload_str)?;
358 context.clone_with_output(payload)
359 } else {
360 context
361 };
362
363 let modules = Self::load_modules(
364 loader,
365 dispatch.clone(),
366 settings.clone(),
367 tx_main_package.clone(),
368 )
369 .await?;
370
371 drop(tx_main_package);
372
373 Self::listener(rx_main_package, steps, modules, settings, Some(context))
374 .await
375 .map_err(|err| {
376 error!("Runtime Error: {:?}", err);
377 err
378 })?;
379
380 Ok(())
381 }
382}