use crate::runtime::Runtime;
use crate::settings::Settings;
use log::{debug, error};
use phlow_engine::Context;
use phlow_sdk::{
module_channel,
structs::{ModuleResponse, ModuleSetup},
};
use phlow_sdk::{otel, prelude::*};
use crate::loader::Loader;
pub fn run_script(path: &str, setup: ModuleSetup, settings: &Settings) {
debug!("Running script at path: {}", path);
let dispatch = setup.dispatch.clone();
tracing::dispatcher::with_default(&dispatch, || {
let _guard = otel::init_tracing_subscriber(setup.app_data.clone());
use_log!();
if let Ok(rt) = tokio::runtime::Runtime::new() {
rt.block_on(async move {
let analyzer = crate::analyzer::Analyzer::from_settings(settings);
let loader = match Loader::load(
&path,
settings.print_yaml,
settings.print_output,
Some(&analyzer),
)
.await
{
Ok(loader) => loader,
Err(err) => {
error!("Runtime error loading script module: {:?}", err);
return;
}
};
let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
let app_data = loader.app_data.clone();
let dispatch = setup.dispatch.clone();
let dispatch_for_runtime = dispatch.clone();
let settings_cloned = settings.clone();
let tx_for_runtime = tx_main_package.clone();
let context = Context::from_setup(setup.with.clone());
let runtime_handle = tokio::task::spawn(async move {
Runtime::run_script(
tx_for_runtime,
rx_main_package,
loader,
dispatch_for_runtime,
settings_cloned,
context,
)
.await
});
let rx = module_channel!(setup);
debug!("Script module loaded, starting main loop");
for package in rx {
debug!("Received package: {:?}", package);
let span = tracing::span!(
tracing::Level::INFO,
"auto_start_steps",
otel.name = app_data.name.clone().unwrap_or("unknown".to_string()),
);
let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();
let runtime_package = Package {
response: Some(response_tx),
request_data: package.input(),
origin: 0,
span: Some(span),
dispatch: Some(dispatch.clone()),
};
debug!("Sending package to main loop: {:?}", runtime_package);
if let Err(err) = tx_main_package.send(runtime_package) {
error!("Failed to send package: {:?}", err);
continue;
}
debug!("Package sent to main loop, waiting for response");
let response = match response_rx.await {
Ok(result) if result.is_undefined() => ModuleResponse::from_success(
package.payload().unwrap_or(Value::Undefined),
),
Ok(result) => ModuleResponse::from_success(result),
Err(err) => ModuleResponse::from_error(format!("Runtime error: {}", err)),
};
if let Err(err) = package.sender.send(response) {
error!("Failed to send response back to module: {:?}", err);
}
debug!("Response sent back to module");
}
debug!("Script module no listeners, waiting for runtime to finish");
match runtime_handle.await {
Ok(Ok(())) => {}
Ok(Err(err)) => {
error!("Runtime error: {:?}", err);
}
Err(err) => {
error!("Runtime task error: {:?}", err);
}
}
});
} else {
tracing::error!("Error creating runtime");
return;
}
});
}