phlow-runtime 0.4.2

Phlow is a fast, modular runtime for building backends with YAML flows, Rust modules, and native OpenTelemetry observability.
Documentation
use crate::runtime::Runtime;
use crate::settings::Settings;
use log::{debug, error};
use phlow_engine::Context;
use phlow_sdk::{
    module_channel,
    structs::{ModuleResponse, ModuleSetup},
};
use phlow_sdk::{otel, prelude::*};

use crate::loader::Loader;

pub fn run_script(path: &str, setup: ModuleSetup, settings: &Settings) {
    debug!("Running script at path: {}", path);
    let dispatch = setup.dispatch.clone();

    tracing::dispatcher::with_default(&dispatch, || {
        let _guard = otel::init_tracing_subscriber(setup.app_data.clone());
        use_log!();

        if let Ok(rt) = tokio::runtime::Runtime::new() {
            rt.block_on(async move {
                // build an Analyzer from settings and pass to loader so analyzer runs during load
                let analyzer = crate::analyzer::Analyzer::from_settings(settings);
                let loader = match Loader::load(
                    &path,
                    settings.print_yaml,
                    settings.print_output,
                    Some(&analyzer),
                )
                .await
                {
                    Ok(loader) => loader,
                    Err(err) => {
                        error!("Runtime error loading script module: {:?}", err);
                        return;
                    }
                };
                let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
                let app_data = loader.app_data.clone();
                let dispatch = setup.dispatch.clone();
                let dispatch_for_runtime = dispatch.clone();
                let settings_cloned = settings.clone();

                // Criar uma task para o runtime que não irá dropar o tx_main_package
                let tx_for_runtime = tx_main_package.clone();
                let context = Context::from_setup(setup.with.clone());

                let runtime_handle = tokio::task::spawn(async move {
                    Runtime::run_script(
                        tx_for_runtime,
                        rx_main_package,
                        loader,
                        dispatch_for_runtime,
                        settings_cloned,
                        context,
                    )
                    .await
                });

                let rx = module_channel!(setup);

                debug!("Script module loaded, starting main loop");

                for package in rx {
                    debug!("Received package: {:?}", package);

                    let span = tracing::span!(
                        tracing::Level::INFO,
                        "auto_start_steps",
                        otel.name = app_data.name.clone().unwrap_or("unknown".to_string()),
                    );

                    // Criar um canal para receber a resposta do runtime
                    let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();

                    let runtime_package = Package {
                        response: Some(response_tx),
                        request_data: package.input(),
                        origin: 0,
                        span: Some(span),
                        dispatch: Some(dispatch.clone()),
                    };

                    debug!("Sending package to main loop: {:?}", runtime_package);

                    if let Err(err) = tx_main_package.send(runtime_package) {
                        error!("Failed to send package: {:?}", err);
                        continue;
                    }

                    debug!("Package sent to main loop, waiting for response");

                    let response = match response_rx.await {
                        Ok(result) if result.is_undefined() => ModuleResponse::from_success(
                            package.payload().unwrap_or(Value::Undefined),
                        ),
                        Ok(result) => ModuleResponse::from_success(result),
                        Err(err) => ModuleResponse::from_error(format!("Runtime error: {}", err)),
                    };

                    if let Err(err) = package.sender.send(response) {
                        error!("Failed to send response back to module: {:?}", err);
                    }

                    debug!("Response sent back to module");
                }

                debug!("Script module no listeners, waiting for runtime to finish");

                match runtime_handle.await {
                    Ok(Ok(())) => {}
                    Ok(Err(err)) => {
                        error!("Runtime error: {:?}", err);
                    }
                    Err(err) => {
                        error!("Runtime task error: {:?}", err);
                    }
                }
            });
        } else {
            tracing::error!("Error creating runtime");
            return;
        }
    });
}