phlow_runtime/
scripts.rs

1use crate::runtime::Runtime;
2use crate::settings::Settings;
3use log::{debug, error};
4use phlow_engine::Context;
5use phlow_sdk::{
6    module_channel,
7    structs::{ModuleResponse, ModuleSetup},
8};
9use phlow_sdk::{otel, prelude::*};
10
11use crate::loader::Loader;
12
13pub fn run_script(path: &str, setup: ModuleSetup, settings: &Settings) {
14    debug!("Running script at path: {}", path);
15    let dispatch = setup.dispatch.clone();
16
17    tracing::dispatcher::with_default(&dispatch, || {
18        let _guard = otel::init_tracing_subscriber(setup.app_data.clone());
19        use_log!();
20
21        if let Ok(rt) = tokio::runtime::Runtime::new() {
22            rt.block_on(async move {
23                // build an Analyzer from settings and pass to loader so analyzer runs during load
24                let analyzer = crate::analyzer::Analyzer::from_settings(settings);
25                let loader = match Loader::load(
26                    &path,
27                    settings.print_yaml,
28                    settings.print_output,
29                    Some(&analyzer),
30                )
31                .await
32                {
33                    Ok(loader) => loader,
34                    Err(err) => {
35                        error!("Runtime error loading script module: {:?}", err);
36                        return;
37                    }
38                };
39                let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
40                let app_data = loader.app_data.clone();
41                let dispatch = setup.dispatch.clone();
42                let dispatch_for_runtime = dispatch.clone();
43                let settings_cloned = settings.clone();
44
45                // Criar uma task para o runtime que não irá dropar o tx_main_package
46                let tx_for_runtime = tx_main_package.clone();
47                let context = Context::from_setup(setup.with.clone());
48
49                let runtime_handle = tokio::task::spawn(async move {
50                    Runtime::run_script(
51                        tx_for_runtime,
52                        rx_main_package,
53                        loader,
54                        dispatch_for_runtime,
55                        settings_cloned,
56                        context,
57                    )
58                    .await
59                });
60
61                let rx = module_channel!(setup);
62
63                debug!("Script module loaded, starting main loop");
64
65                for package in rx {
66                    debug!("Received package: {:?}", package);
67
68                    let span = tracing::span!(
69                        tracing::Level::INFO,
70                        "auto_start_steps",
71                        otel.name = app_data.name.clone().unwrap_or("unknown".to_string()),
72                    );
73
74                    // Criar um canal para receber a resposta do runtime
75                    let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();
76
77                    let runtime_package = Package {
78                        response: Some(response_tx),
79                        request_data: package.input(),
80                        origin: 0,
81                        span: Some(span),
82                        dispatch: Some(dispatch.clone()),
83                    };
84
85                    debug!("Sending package to main loop: {:?}", runtime_package);
86
87                    if let Err(err) = tx_main_package.send(runtime_package) {
88                        error!("Failed to send package: {:?}", err);
89                        continue;
90                    }
91
92                    debug!("Package sent to main loop, waiting for response");
93
94                    let response = match response_rx.await {
95                        Ok(result) if result.is_undefined() => ModuleResponse::from_success(
96                            package.payload().unwrap_or(Value::Undefined),
97                        ),
98                        Ok(result) => ModuleResponse::from_success(result),
99                        Err(err) => ModuleResponse::from_error(format!("Runtime error: {}", err)),
100                    };
101
102                    if let Err(err) = package.sender.send(response) {
103                        error!("Failed to send response back to module: {:?}", err);
104                    }
105
106                    debug!("Response sent back to module");
107                }
108
109                debug!("Script module no listeners, waiting for runtime to finish");
110
111                match runtime_handle.await {
112                    Ok(Ok(())) => {}
113                    Ok(Err(err)) => {
114                        error!("Runtime error: {:?}", err);
115                    }
116                    Err(err) => {
117                        error!("Runtime task error: {:?}", err);
118                    }
119                }
120            });
121        } else {
122            tracing::error!("Error creating runtime");
123            return;
124        }
125    });
126}