Skip to main content

phlow_runtime/
runtime.rs

1use crate::loader::{Loader, load_module};
2use crate::inline_module::{InlineModules, PhlowModuleRequest};
3#[cfg(target_env = "gnu")]
4use crate::memory::force_memory_release;
5use crate::settings::Settings;
6use crossbeam::channel;
7use futures::future::join_all;
8use log::{debug, error, info, warn};
9use phlow_engine::phs::{Script, ScriptError, build_engine};
10use phlow_engine::{Context, Phlow};
11use phlow_sdk::structs::Package;
12use phlow_sdk::tokio;
13use phlow_sdk::{
14    prelude::{Array, Value},
15    structs::{ModulePackage, ModuleSetup, Modules},
16    tracing::{self, Dispatch, dispatcher},
17};
18use std::collections::HashSet;
19use std::fmt::Display;
20use std::sync::Arc;
21use std::thread;
22use tokio::sync::oneshot;
23
24#[derive(Debug)]
25pub enum RuntimeError {
26    ModuleWithError(ScriptError),
27    ModuleRegisterError,
28    FlowExecutionError(String),
29    InlineModuleError(String),
30}
31
32impl Display for RuntimeError {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        match self {
35            RuntimeError::ModuleRegisterError => write!(f, "Module register error"),
36            RuntimeError::FlowExecutionError(err) => write!(f, "Flow execution error: {}", err),
37            RuntimeError::ModuleWithError(err) => write!(f, "Module with error: {}", err),
38            RuntimeError::InlineModuleError(err) => write!(f, "Inline module error: {}", err),
39        }
40    }
41}
42
43fn parse_cli_value(flag: &str, value: &str) -> Result<Value, RuntimeError> {
44    match Value::json_to_value(value) {
45        Ok(parsed) => Ok(parsed),
46        Err(err) => {
47            error!("Failed to parse --{} value '{}': {:?}", flag, value, err);
48            Err(RuntimeError::FlowExecutionError(format!(
49                "Failed to parse --{} value: {:?}",
50                flag, err
51            )))
52        }
53    }
54}
55
56pub struct Runtime {}
57
58fn spawn_inline_module_worker(
59    name: String,
60    handler: crate::inline_module::PhlowModuleHandler,
61    with: Value,
62    app_data: phlow_sdk::structs::ApplicationData,
63    dispatch: Dispatch,
64    runtime_handle: tokio::runtime::Handle,
65    receiver: channel::Receiver<ModulePackage>,
66) {
67    thread::spawn(move || {
68        for package in receiver {
69            let request = PhlowModuleRequest {
70                input: package.input(),
71                payload: package.payload(),
72                with: with.clone(),
73                app_data: app_data.clone(),
74                dispatch: dispatch.clone(),
75            };
76
77            let response = dispatcher::with_default(&dispatch, || {
78                runtime_handle.block_on((handler)(request))
79            });
80
81            if package.sender.send(response).is_err() {
82                debug!("Inline module '{}' response channel closed", name);
83            }
84        }
85
86        debug!("Inline module '{}' stopped", name);
87    });
88}
89
90impl Runtime {
91    async fn load_modules(
92        loader: Loader,
93        dispatch: Dispatch,
94        settings: Settings,
95        tx_main_package: channel::Sender<Package>,
96        inline_modules: &InlineModules,
97    ) -> Result<Modules, RuntimeError> {
98        let mut modules = Modules::default();
99        let engine = build_engine(None);
100        let runtime_handle = tokio::runtime::Handle::current();
101        // -------------------------
102        // Load the modules
103        // -------------------------
104        let app_data = loader.app_data.clone();
105        let loader_main_id = loader.main.clone();
106        let mut unused_inline: HashSet<String> = inline_modules.keys().cloned().collect();
107
108        for (id, module) in loader.modules.into_iter().enumerate() {
109            let (setup_sender, setup_receive) =
110                oneshot::channel::<Option<channel::Sender<ModulePackage>>>();
111
112            let is_main = loader_main_id == id as i32;
113            // Se --var-main foi especificado, não permitir que módulos principais sejam executados
114            let main_sender = if is_main && settings.var_main.is_none() {
115                Some(tx_main_package.clone())
116            } else {
117                None
118            };
119
120            let with = {
121                let script = match Script::try_build(engine.clone(), &module.with) {
122                    Ok(payload) => payload,
123                    Err(err) => return Err(RuntimeError::ModuleWithError(err)),
124                };
125
126                let with: Value = script
127                    .evaluate_without_context()
128                    .map_err(|err| RuntimeError::ModuleWithError(err))?;
129
130                log::debug!(
131                    "Module '{}' with: {}",
132                    module.name,
133                    with.to_json(phlow_sdk::prelude::JsonMode::Indented)
134                ); // Debug print
135                with
136            };
137
138            let inline_module = inline_modules.get(&module.name).cloned();
139            if inline_module.is_some() {
140                unused_inline.remove(&module.name);
141            }
142
143            if inline_module.is_some() && is_main && settings.var_main.is_none() {
144                return Err(RuntimeError::InlineModuleError(format!(
145                    "Inline module '{}' is declared as main, but runtime is waiting for main output",
146                    module.name
147                )));
148            }
149
150            let mut module_data = module;
151            let mut inline_worker = None;
152
153            if let Some(inline_module) = inline_module {
154                let handler = inline_module.handler().ok_or_else(|| {
155                    RuntimeError::InlineModuleError(format!(
156                        "Inline module '{}' is missing a handler",
157                        module_data.name
158                    ))
159                })?;
160
161                let schema = inline_module.schema();
162                if !schema.input.is_null() {
163                    module_data.input = schema.input.clone();
164                }
165                if !schema.output.is_null() {
166                    module_data.output = schema.output.clone();
167                }
168                if !schema.input_order.is_empty() {
169                    module_data.input_order = Value::Array(Array::from(schema.input_order.clone()));
170                }
171                module_data.with = with.clone();
172
173                let (sender, receiver) = channel::unbounded::<ModulePackage>();
174                if setup_sender.send(Some(sender)).is_err() {
175                    return Err(RuntimeError::InlineModuleError(format!(
176                        "Inline module '{}' failed to register",
177                        module_data.name
178                    )));
179                }
180
181                inline_worker = Some((
182                    module_data.name.clone(),
183                    handler,
184                    with,
185                    app_data.clone(),
186                    dispatch.clone(),
187                    runtime_handle.clone(),
188                    receiver,
189                ));
190            } else {
191                let setup = ModuleSetup {
192                    id,
193                    setup_sender,
194                    main_sender,
195                    with,
196                    dispatch: dispatch.clone(),
197                    app_data: app_data.clone(),
198                    is_test_mode: false,
199                };
200
201                let module_target = module_data.module.clone();
202                let module_version = module_data.version.clone();
203                let local_path = module_data.local_path.clone();
204                let settings = settings.clone();
205
206                thread::spawn(move || {
207                    let result: Result<(), crate::loader::error::Error> =
208                        load_module(setup, &module_target, &module_version, local_path, settings);
209
210                    if let Err(err) = result {
211                        error!("Runtime Error Load Module: {:?}", err)
212                    }
213                });
214
215                debug!(
216                    "Module {} loaded with name \"{}\" and version \"{}\"",
217                    module_data.module, module_data.name, module_data.version
218                );
219            }
220
221            match setup_receive.await {
222                Ok(Some(sender)) => {
223                    debug!("Module {} registered", module_data.name);
224                    modules.register(module_data, sender);
225                }
226                Ok(None) => {
227                    debug!("Module {} did not register", module_data.name);
228                }
229                Err(_) => {
230                    return Err(RuntimeError::ModuleRegisterError);
231                }
232            }
233
234            if let Some((
235                name,
236                handler,
237                with,
238                app_data,
239                dispatch,
240                runtime_handle,
241                receiver,
242            )) = inline_worker
243            {
244                spawn_inline_module_worker(
245                    name,
246                    handler,
247                    with,
248                    app_data,
249                    dispatch,
250                    runtime_handle,
251                    receiver,
252                );
253            }
254        }
255
256        if !unused_inline.is_empty() {
257            warn!(
258                "Inline modules not declared in pipeline: {}",
259                unused_inline.into_iter().collect::<Vec<_>>().join(", ")
260            );
261        }
262
263        Ok(modules)
264    }
265
266    async fn listener(
267        rx_main_package: channel::Receiver<Package>,
268        steps: Value,
269        modules: Modules,
270        settings: Settings,
271        default_context: Option<Context>,
272    ) -> Result<(), RuntimeError> {
273        let phlow = Arc::new({
274            match Phlow::try_from_value(&steps, Some(Arc::new(modules))) {
275                Ok(phlow) => phlow,
276                Err(err) => return Err(RuntimeError::FlowExecutionError(err.to_string())),
277            }
278        });
279        if let Some(controller) = phlow_engine::debug::debug_controller() {
280            controller.set_script(phlow.script()).await;
281        }
282
283        let start_step = if let Some(step_id) = settings.start_step.as_deref() {
284            match phlow.find_step_reference(step_id) {
285                Some(step_ref) => Some(step_ref),
286                None => {
287                    return Err(RuntimeError::FlowExecutionError(format!(
288                        "Step id '{}' not found",
289                        step_id
290                    )));
291                }
292            }
293        } else {
294            None
295        };
296
297        drop(steps);
298
299        let mut handles = Vec::new();
300        let default_context = default_context.clone();
301
302        for _i in 0..settings.package_consumer_count {
303            let rx_main_pkg = rx_main_package.clone();
304            let phlow = phlow.clone();
305            let default_context = default_context.clone();
306            let start_step = start_step.clone();
307
308            let handle = tokio::task::spawn_blocking(move || {
309                for mut main_package in rx_main_pkg {
310                    let phlow = phlow.clone();
311                    let parent = match main_package.span.clone() {
312                        Some(span) => span,
313                        None => {
314                            error!("Span not found in main module");
315                            continue;
316                        }
317                    };
318                    let dispatch = match main_package.dispatch.clone() {
319                        Some(dispatch) => dispatch,
320                        None => {
321                            error!("Dispatch not found in main module");
322                            continue;
323                        }
324                    };
325
326                    let mut context = {
327                        let data = main_package.get_data().cloned().unwrap_or(Value::Null);
328                        if let Some(mut context) = default_context.clone() {
329                            context.set_main(data);
330                            context
331                        } else {
332                            Context::from_main(data)
333                        }
334                    };
335                    let start_step = start_step.clone();
336
337                    tokio::task::block_in_place(move || {
338                        dispatcher::with_default(&dispatch, || {
339                            let _enter = parent.enter();
340                            let rt = tokio::runtime::Handle::current();
341
342                            rt.block_on(async {
343                                let result = if let Some(step_ref) = start_step.clone() {
344                                    phlow.execute_from(&mut context, step_ref).await
345                                } else {
346                                    phlow.execute(&mut context).await
347                                };
348                                match result {
349                                    Ok(result) => {
350                                        let result_value = result.unwrap_or(Value::Undefined);
351                                        main_package.send(result_value);
352                                    }
353                                    Err(err) => {
354                                        error!("Runtime Error Execute Steps: {:?}", err);
355                                    }
356                                }
357                            });
358                        });
359                    });
360                }
361            });
362
363            handles.push(handle);
364        }
365
366        join_all(handles).await;
367
368        Ok(())
369    }
370
371    pub async fn run(
372        loader: Loader,
373        dispatch: Dispatch,
374        settings: Settings,
375    ) -> Result<(), RuntimeError> {
376        // -------------------------
377        // Create the channels
378        // -------------------------
379        let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
380
381        let var_payload_value = match &settings.var_payload {
382            Some(var_payload_str) => Some(parse_cli_value("var-payload", var_payload_str)?),
383            None => None,
384        };
385        let default_context = var_payload_value.as_ref().map(|payload| {
386            let mut context = Context::new();
387            context.add_step_payload(Some(payload.clone()));
388            context
389        });
390
391        let no_main = loader.main == -1 || settings.var_main.is_some();
392        let steps = loader.get_steps();
393        let inline_modules = InlineModules::default();
394        let modules = Self::load_modules(
395            loader,
396            dispatch.clone(),
397            settings.clone(),
398            tx_main_package.clone(),
399            &inline_modules,
400        )
401        .await?;
402
403        // Se não há main definido ou --var-main foi especificado, forçar o início dos steps
404        if no_main {
405            // Criar um span padrão para o início dos steps
406            let span = tracing::span!(
407                tracing::Level::INFO,
408                "auto_start_steps",
409                otel.name = "phlow auto start"
410            );
411
412            // Se --var-main foi especificado, processar o valor usando valu3
413            let request_data = if let Some(var_main_str) = &settings.var_main {
414                // Usar valu3 para processar o valor da mesma forma que outros valores
415                Some(parse_cli_value("var-main", var_main_str)?)
416            } else {
417                None
418            };
419
420            // Enviar um pacote com os dados do --var-main para iniciar os steps
421            let package = Package {
422                response: None,
423                request_data,
424                origin: 0,
425                span: Some(span),
426                dispatch: Some(dispatch.clone()),
427            };
428
429            if let Err(err) = tx_main_package.send(package) {
430                error!("Failed to send package: {:?}", err);
431                return Err(RuntimeError::FlowExecutionError(
432                    "Failed to send package".to_string(),
433                ));
434            }
435
436            if settings.var_main.is_some() {
437                info!("Using --var-main to simulate main module output");
438            }
439        }
440
441        drop(tx_main_package);
442
443        #[cfg(target_env = "gnu")]
444        if settings.garbage_collection {
445            thread::spawn(move || {
446                loop {
447                    thread::sleep(std::time::Duration::from_secs(
448                        settings.garbage_collection_interval,
449                    ));
450                    force_memory_release(settings.min_allocated_memory);
451                }
452            });
453        }
454
455        info!("Phlow!");
456
457        // -------------------------
458        // Create the phlow
459        // -------------------------
460        Self::listener(rx_main_package, steps, modules, settings, default_context)
461            .await
462            .map_err(|err| {
463                error!("Runtime Error: {:?}", err);
464                err
465            })?;
466
467        Ok(())
468    }
469
470    pub async fn run_script(
471        tx_main_package: channel::Sender<Package>,
472        rx_main_package: channel::Receiver<Package>,
473        loader: Loader,
474        dispatch: Dispatch,
475        settings: Settings,
476        context: Context,
477    ) -> Result<(), RuntimeError> {
478        let inline_modules = InlineModules::default();
479        Self::run_script_with_modules(
480            tx_main_package,
481            rx_main_package,
482            loader,
483            dispatch,
484            settings,
485            context,
486            inline_modules,
487        )
488        .await
489    }
490
491    pub async fn run_script_with_modules(
492        tx_main_package: channel::Sender<Package>,
493        rx_main_package: channel::Receiver<Package>,
494        loader: Loader,
495        dispatch: Dispatch,
496        settings: Settings,
497        context: Context,
498        inline_modules: InlineModules,
499    ) -> Result<(), RuntimeError> {
500        let steps = loader.get_steps();
501        let context = if let Some(var_payload_str) = &settings.var_payload {
502            let payload = parse_cli_value("var-payload", var_payload_str)?;
503            context.clone_with_output(payload)
504        } else {
505            context
506        };
507
508        let modules = Self::load_modules(
509            loader,
510            dispatch.clone(),
511            settings.clone(),
512            tx_main_package.clone(),
513            &inline_modules,
514        )
515        .await?;
516
517        drop(tx_main_package);
518
519        Self::listener(rx_main_package, steps, modules, settings, Some(context))
520            .await
521            .map_err(|err| {
522                error!("Runtime Error: {:?}", err);
523                err
524            })?;
525
526        Ok(())
527    }
528}