Skip to main content

phlow_runtime/
runtime.rs

1use crate::loader::{Loader, load_module};
2#[cfg(target_env = "gnu")]
3use crate::memory::force_memory_release;
4use crate::settings::Settings;
5use crossbeam::channel;
6use futures::future::join_all;
7use log::{debug, error, info};
8use phlow_engine::phs::{Script, ScriptError, build_engine};
9use phlow_engine::{Context, Phlow};
10use phlow_sdk::structs::Package;
11use phlow_sdk::tokio;
12use phlow_sdk::{
13    prelude::Value,
14    structs::{ModulePackage, ModuleSetup, Modules},
15    tracing::{self, Dispatch, dispatcher},
16};
17use std::fmt::Display;
18use std::sync::Arc;
19#[cfg(target_env = "gnu")]
20use std::thread;
21use tokio::sync::oneshot;
22
23#[derive(Debug)]
24pub enum RuntimeError {
25    ModuleWithError(ScriptError),
26    ModuleRegisterError,
27    FlowExecutionError(String),
28}
29
30impl Display for RuntimeError {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            RuntimeError::ModuleRegisterError => write!(f, "Module register error"),
34            RuntimeError::FlowExecutionError(err) => write!(f, "Flow execution error: {}", err),
35            RuntimeError::ModuleWithError(err) => write!(f, "Module with error: {}", err),
36        }
37    }
38}
39
40fn parse_cli_value(flag: &str, value: &str) -> Result<Value, RuntimeError> {
41    match Value::json_to_value(value) {
42        Ok(parsed) => Ok(parsed),
43        Err(err) => {
44            error!("Failed to parse --{} value '{}': {:?}", flag, value, err);
45            Err(RuntimeError::FlowExecutionError(format!(
46                "Failed to parse --{} value: {:?}",
47                flag, err
48            )))
49        }
50    }
51}
52
53pub struct Runtime {}
54
55impl Runtime {
56    async fn load_modules(
57        loader: Loader,
58        dispatch: Dispatch,
59        settings: Settings,
60        tx_main_package: channel::Sender<Package>,
61    ) -> Result<Modules, RuntimeError> {
62        let mut modules = Modules::default();
63        let engine = build_engine(None);
64        // -------------------------
65        // Load the modules
66        // -------------------------
67        let app_data = loader.app_data.clone();
68        let loader_main_id = loader.main.clone();
69
70        for (id, module) in loader.modules.into_iter().enumerate() {
71            let (setup_sender, setup_receive) =
72                oneshot::channel::<Option<channel::Sender<ModulePackage>>>();
73
74            // Se --var-main foi especificado, não permitir que módulos principais sejam executados
75            let main_sender = if loader_main_id == id as i32 && settings.var_main.is_none() {
76                Some(tx_main_package.clone())
77            } else {
78                None
79            };
80
81            let with = {
82                let script = match Script::try_build(engine.clone(), &module.with) {
83                    Ok(payload) => payload,
84                    Err(err) => return Err(RuntimeError::ModuleWithError(err)),
85                };
86
87                let with: Value = script
88                    .evaluate_without_context()
89                    .map_err(|err| RuntimeError::ModuleWithError(err))?;
90
91                log::debug!(
92                    "Module '{}' with: {}",
93                    module.name,
94                    with.to_json(phlow_sdk::prelude::JsonMode::Indented)
95                ); // Debug print
96                with
97            };
98
99            let setup = ModuleSetup {
100                id,
101                setup_sender,
102                main_sender,
103                with,
104                dispatch: dispatch.clone(),
105                app_data: app_data.clone(),
106                is_test_mode: false,
107            };
108
109            let module_target = module.module.clone();
110            let module_version = module.version.clone();
111            let local_path = module.local_path.clone();
112            let settings = settings.clone();
113
114            std::thread::spawn(move || {
115                let result: Result<(), crate::loader::error::Error> =
116                    load_module(setup, &module_target, &module_version, local_path, settings);
117
118                if let Err(err) = result {
119                    error!("Runtime Error Load Module: {:?}", err)
120                }
121            });
122
123            debug!(
124                "Module {} loaded with name \"{}\" and version \"{}\"",
125                module.module, module.name, module.version
126            );
127
128            match setup_receive.await {
129                Ok(Some(sender)) => {
130                    debug!("Module {} registered", module.name);
131                    modules.register(module, sender);
132                }
133                Ok(None) => {
134                    debug!("Module {} did not register", module.name);
135                }
136                Err(_) => {
137                    return Err(RuntimeError::ModuleRegisterError);
138                }
139            }
140        }
141
142        Ok(modules)
143    }
144
145    async fn listener(
146        rx_main_package: channel::Receiver<Package>,
147        steps: Value,
148        modules: Modules,
149        settings: Settings,
150        default_context: Option<Context>,
151    ) -> Result<(), RuntimeError> {
152        let phlow = Arc::new({
153            match Phlow::try_from_value(&steps, Some(Arc::new(modules))) {
154                Ok(phlow) => phlow,
155                Err(err) => return Err(RuntimeError::FlowExecutionError(err.to_string())),
156            }
157        });
158        if let Some(controller) = phlow_engine::debug::debug_controller() {
159            controller.set_script(phlow.script()).await;
160        }
161
162        let start_step = if let Some(step_id) = settings.start_step.as_deref() {
163            match phlow.find_step_reference(step_id) {
164                Some(step_ref) => Some(step_ref),
165                None => {
166                    return Err(RuntimeError::FlowExecutionError(format!(
167                        "Step id '{}' not found",
168                        step_id
169                    )));
170                }
171            }
172        } else {
173            None
174        };
175
176        drop(steps);
177
178        let mut handles = Vec::new();
179        let default_context = default_context.clone();
180
181        for _i in 0..settings.package_consumer_count {
182            let rx_main_pkg = rx_main_package.clone();
183            let phlow = phlow.clone();
184            let default_context = default_context.clone();
185            let start_step = start_step.clone();
186
187            let handle = tokio::task::spawn_blocking(move || {
188                for mut main_package in rx_main_pkg {
189                    let phlow = phlow.clone();
190                    let parent = match main_package.span.clone() {
191                        Some(span) => span,
192                        None => {
193                            error!("Span not found in main module");
194                            continue;
195                        }
196                    };
197                    let dispatch = match main_package.dispatch.clone() {
198                        Some(dispatch) => dispatch,
199                        None => {
200                            error!("Dispatch not found in main module");
201                            continue;
202                        }
203                    };
204
205                    let mut context = {
206                        let data = main_package.get_data().cloned().unwrap_or(Value::Null);
207                        if let Some(mut context) = default_context.clone() {
208                            context.set_main(data);
209                            context
210                        } else {
211                            Context::from_main(data)
212                        }
213                    };
214                    let start_step = start_step.clone();
215
216                    tokio::task::block_in_place(move || {
217                        dispatcher::with_default(&dispatch, || {
218                            let _enter = parent.enter();
219                            let rt = tokio::runtime::Handle::current();
220
221                            rt.block_on(async {
222                                let result = if let Some(step_ref) = start_step.clone() {
223                                    phlow.execute_from(&mut context, step_ref).await
224                                } else {
225                                    phlow.execute(&mut context).await
226                                };
227                                match result {
228                                    Ok(result) => {
229                                        let result_value = result.unwrap_or(Value::Undefined);
230                                        main_package.send(result_value);
231                                    }
232                                    Err(err) => {
233                                        error!("Runtime Error Execute Steps: {:?}", err);
234                                    }
235                                }
236                            });
237                        });
238                    });
239                }
240            });
241
242            handles.push(handle);
243        }
244
245        join_all(handles).await;
246
247        Ok(())
248    }
249
250    pub async fn run(
251        loader: Loader,
252        dispatch: Dispatch,
253        settings: Settings,
254    ) -> Result<(), RuntimeError> {
255        // -------------------------
256        // Create the channels
257        // -------------------------
258        let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
259
260        let var_payload_value = match &settings.var_payload {
261            Some(var_payload_str) => Some(parse_cli_value("var-payload", var_payload_str)?),
262            None => None,
263        };
264        let default_context = var_payload_value.as_ref().map(|payload| {
265            let mut context = Context::new();
266            context.add_step_payload(Some(payload.clone()));
267            context
268        });
269
270        let no_main = loader.main == -1 || settings.var_main.is_some();
271        let steps = loader.get_steps();
272        let modules = Self::load_modules(
273            loader,
274            dispatch.clone(),
275            settings.clone(),
276            tx_main_package.clone(),
277        )
278        .await?;
279
280        // Se não há main definido ou --var-main foi especificado, forçar o início dos steps
281        if no_main {
282            // Criar um span padrão para o início dos steps
283            let span = tracing::span!(
284                tracing::Level::INFO,
285                "auto_start_steps",
286                otel.name = "phlow auto start"
287            );
288
289            // Se --var-main foi especificado, processar o valor usando valu3
290            let request_data = if let Some(var_main_str) = &settings.var_main {
291                // Usar valu3 para processar o valor da mesma forma que outros valores
292                Some(parse_cli_value("var-main", var_main_str)?)
293            } else {
294                None
295            };
296
297            // Enviar um pacote com os dados do --var-main para iniciar os steps
298            let package = Package {
299                response: None,
300                request_data,
301                origin: 0,
302                span: Some(span),
303                dispatch: Some(dispatch.clone()),
304            };
305
306            if let Err(err) = tx_main_package.send(package) {
307                error!("Failed to send package: {:?}", err);
308                return Err(RuntimeError::FlowExecutionError(
309                    "Failed to send package".to_string(),
310                ));
311            }
312
313            if settings.var_main.is_some() {
314                info!("Using --var-main to simulate main module output");
315            }
316        }
317
318        drop(tx_main_package);
319
320        #[cfg(target_env = "gnu")]
321        if settings.garbage_collection {
322            thread::spawn(move || {
323                loop {
324                    thread::sleep(std::time::Duration::from_secs(
325                        settings.garbage_collection_interval,
326                    ));
327                    force_memory_release(settings.min_allocated_memory);
328                }
329            });
330        }
331
332        info!("Phlow!");
333
334        // -------------------------
335        // Create the phlow
336        // -------------------------
337        Self::listener(rx_main_package, steps, modules, settings, default_context)
338            .await
339            .map_err(|err| {
340                error!("Runtime Error: {:?}", err);
341                err
342            })?;
343
344        Ok(())
345    }
346
347    pub async fn run_script(
348        tx_main_package: channel::Sender<Package>,
349        rx_main_package: channel::Receiver<Package>,
350        loader: Loader,
351        dispatch: Dispatch,
352        settings: Settings,
353        context: Context,
354    ) -> Result<(), RuntimeError> {
355        let steps = loader.get_steps();
356        let context = if let Some(var_payload_str) = &settings.var_payload {
357            let payload = parse_cli_value("var-payload", var_payload_str)?;
358            context.clone_with_output(payload)
359        } else {
360            context
361        };
362
363        let modules = Self::load_modules(
364            loader,
365            dispatch.clone(),
366            settings.clone(),
367            tx_main_package.clone(),
368        )
369        .await?;
370
371        drop(tx_main_package);
372
373        Self::listener(rx_main_package, steps, modules, settings, Some(context))
374            .await
375            .map_err(|err| {
376                error!("Runtime Error: {:?}", err);
377                err
378            })?;
379
380        Ok(())
381    }
382}