1use crate::runtime::Runtime;
2use crate::settings::Settings;
3use log::{debug, error};
4use phlow_engine::Context;
5use phlow_sdk::{
6 module_channel,
7 structs::{ModuleResponse, ModuleSetup},
8};
9use phlow_sdk::{otel, prelude::*};
10
11use crate::loader::Loader;
12
13pub fn run_script(path: &str, setup: ModuleSetup, settings: &Settings) {
14 debug!("Running script at path: {}", path);
15 let dispatch = setup.dispatch.clone();
16
17 tracing::dispatcher::with_default(&dispatch, || {
18 let _guard = otel::init_tracing_subscriber(setup.app_data.clone());
19 use_log!();
20
21 if let Ok(rt) = tokio::runtime::Runtime::new() {
22 rt.block_on(async move {
23 let analyzer = crate::analyzer::Analyzer::from_settings(settings);
25 let loader = match Loader::load(
26 &path,
27 settings.print_yaml,
28 settings.print_output,
29 Some(&analyzer),
30 )
31 .await
32 {
33 Ok(loader) => loader,
34 Err(err) => {
35 error!("Runtime error loading script module: {:?}", err);
36 return;
37 }
38 };
39 let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
40 let app_data = loader.app_data.clone();
41 let dispatch = setup.dispatch.clone();
42 let dispatch_for_runtime = dispatch.clone();
43 let settings_cloned = settings.clone();
44
45 let tx_for_runtime = tx_main_package.clone();
47 let context = Context::from_setup(setup.with.clone());
48
49 let runtime_handle = tokio::task::spawn(async move {
50 Runtime::run_script(
51 tx_for_runtime,
52 rx_main_package,
53 loader,
54 dispatch_for_runtime,
55 settings_cloned,
56 context,
57 )
58 .await
59 });
60
61 let rx = module_channel!(setup);
62
63 debug!("Script module loaded, starting main loop");
64
65 for package in rx {
66 debug!("Received package: {:?}", package);
67
68 let span = tracing::span!(
69 tracing::Level::INFO,
70 "auto_start_steps",
71 otel.name = app_data.name.clone().unwrap_or("unknown".to_string()),
72 );
73
74 let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();
76
77 let runtime_package = Package {
78 response: Some(response_tx),
79 request_data: package.input(),
80 origin: 0,
81 span: Some(span),
82 dispatch: Some(dispatch.clone()),
83 };
84
85 debug!("Sending package to main loop: {:?}", runtime_package);
86
87 if let Err(err) = tx_main_package.send(runtime_package) {
88 error!("Failed to send package: {:?}", err);
89 continue;
90 }
91
92 debug!("Package sent to main loop, waiting for response");
93
94 let response = match response_rx.await {
95 Ok(result) if result.is_undefined() => ModuleResponse::from_success(
96 package.payload().unwrap_or(Value::Undefined),
97 ),
98 Ok(result) => ModuleResponse::from_success(result),
99 Err(err) => ModuleResponse::from_error(format!("Runtime error: {}", err)),
100 };
101
102 if let Err(err) = package.sender.send(response) {
103 error!("Failed to send response back to module: {:?}", err);
104 }
105
106 debug!("Response sent back to module");
107 }
108
109 debug!("Script module no listeners, waiting for runtime to finish");
110
111 match runtime_handle.await {
112 Ok(Ok(())) => {}
113 Ok(Err(err)) => {
114 error!("Runtime error: {:?}", err);
115 }
116 Err(err) => {
117 error!("Runtime task error: {:?}", err);
118 }
119 }
120 });
121 } else {
122 tracing::error!("Error creating runtime");
123 return;
124 }
125 });
126}