1use crate::loader::{Loader, load_module};
2use crate::inline_module::{InlineModules, PhlowModuleRequest};
3#[cfg(target_env = "gnu")]
4use crate::memory::force_memory_release;
5use crate::settings::Settings;
6use crossbeam::channel;
7use futures::future::join_all;
8use log::{debug, error, info, warn};
9use phlow_engine::phs::{Script, ScriptError, build_engine};
10use phlow_engine::{Context, Phlow};
11use phlow_sdk::structs::Package;
12use phlow_sdk::tokio;
13use phlow_sdk::{
14 prelude::{Array, Value},
15 structs::{ModulePackage, ModuleSetup, Modules},
16 tracing::{self, Dispatch, dispatcher},
17};
18use std::collections::HashSet;
19use std::fmt::Display;
20use std::sync::Arc;
21use std::thread;
22use tokio::sync::oneshot;
23
24#[derive(Debug)]
25pub enum RuntimeError {
26 ModuleWithError(ScriptError),
27 ModuleRegisterError,
28 FlowExecutionError(String),
29 InlineModuleError(String),
30}
31
32impl Display for RuntimeError {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 match self {
35 RuntimeError::ModuleRegisterError => write!(f, "Module register error"),
36 RuntimeError::FlowExecutionError(err) => write!(f, "Flow execution error: {}", err),
37 RuntimeError::ModuleWithError(err) => write!(f, "Module with error: {}", err),
38 RuntimeError::InlineModuleError(err) => write!(f, "Inline module error: {}", err),
39 }
40 }
41}
42
43fn parse_cli_value(flag: &str, value: &str) -> Result<Value, RuntimeError> {
44 match Value::json_to_value(value) {
45 Ok(parsed) => Ok(parsed),
46 Err(err) => {
47 error!("Failed to parse --{} value '{}': {:?}", flag, value, err);
48 Err(RuntimeError::FlowExecutionError(format!(
49 "Failed to parse --{} value: {:?}",
50 flag, err
51 )))
52 }
53 }
54}
55
56pub struct Runtime {}
57
58fn spawn_inline_module_worker(
59 name: String,
60 handler: crate::inline_module::PhlowModuleHandler,
61 with: Value,
62 app_data: phlow_sdk::structs::ApplicationData,
63 dispatch: Dispatch,
64 runtime_handle: tokio::runtime::Handle,
65 receiver: channel::Receiver<ModulePackage>,
66) {
67 thread::spawn(move || {
68 for package in receiver {
69 let request = PhlowModuleRequest {
70 input: package.input(),
71 payload: package.payload(),
72 with: with.clone(),
73 app_data: app_data.clone(),
74 dispatch: dispatch.clone(),
75 };
76
77 let response = dispatcher::with_default(&dispatch, || {
78 runtime_handle.block_on((handler)(request))
79 });
80
81 if package.sender.send(response).is_err() {
82 debug!("Inline module '{}' response channel closed", name);
83 }
84 }
85
86 debug!("Inline module '{}' stopped", name);
87 });
88}
89
90impl Runtime {
91 async fn load_modules(
92 loader: Loader,
93 dispatch: Dispatch,
94 settings: Settings,
95 tx_main_package: channel::Sender<Package>,
96 inline_modules: &InlineModules,
97 ) -> Result<Modules, RuntimeError> {
98 let mut modules = Modules::default();
99 let engine = build_engine(None);
100 let runtime_handle = tokio::runtime::Handle::current();
101 let app_data = loader.app_data.clone();
105 let loader_main_id = loader.main.clone();
106 let mut unused_inline: HashSet<String> = inline_modules.keys().cloned().collect();
107
108 for (id, module) in loader.modules.into_iter().enumerate() {
109 let (setup_sender, setup_receive) =
110 oneshot::channel::<Option<channel::Sender<ModulePackage>>>();
111
112 let is_main = loader_main_id == id as i32;
113 let main_sender = if is_main && settings.var_main.is_none() {
115 Some(tx_main_package.clone())
116 } else {
117 None
118 };
119
120 let with = {
121 let script = match Script::try_build(engine.clone(), &module.with) {
122 Ok(payload) => payload,
123 Err(err) => return Err(RuntimeError::ModuleWithError(err)),
124 };
125
126 let with: Value = script
127 .evaluate_without_context()
128 .map_err(|err| RuntimeError::ModuleWithError(err))?;
129
130 log::debug!(
131 "Module '{}' with: {}",
132 module.name,
133 with.to_json(phlow_sdk::prelude::JsonMode::Indented)
134 ); with
136 };
137
138 let inline_module = inline_modules.get(&module.name).cloned();
139 if inline_module.is_some() {
140 unused_inline.remove(&module.name);
141 }
142
143 if inline_module.is_some() && is_main && settings.var_main.is_none() {
144 return Err(RuntimeError::InlineModuleError(format!(
145 "Inline module '{}' is declared as main, but runtime is waiting for main output",
146 module.name
147 )));
148 }
149
150 let mut module_data = module;
151 let mut inline_worker = None;
152
153 if let Some(inline_module) = inline_module {
154 let handler = inline_module.handler().ok_or_else(|| {
155 RuntimeError::InlineModuleError(format!(
156 "Inline module '{}' is missing a handler",
157 module_data.name
158 ))
159 })?;
160
161 let schema = inline_module.schema();
162 if !schema.input.is_null() {
163 module_data.input = schema.input.clone();
164 }
165 if !schema.output.is_null() {
166 module_data.output = schema.output.clone();
167 }
168 if !schema.input_order.is_empty() {
169 module_data.input_order = Value::Array(Array::from(schema.input_order.clone()));
170 }
171 module_data.with = with.clone();
172
173 let (sender, receiver) = channel::unbounded::<ModulePackage>();
174 if setup_sender.send(Some(sender)).is_err() {
175 return Err(RuntimeError::InlineModuleError(format!(
176 "Inline module '{}' failed to register",
177 module_data.name
178 )));
179 }
180
181 inline_worker = Some((
182 module_data.name.clone(),
183 handler,
184 with,
185 app_data.clone(),
186 dispatch.clone(),
187 runtime_handle.clone(),
188 receiver,
189 ));
190 } else {
191 let setup = ModuleSetup {
192 id,
193 setup_sender,
194 main_sender,
195 with,
196 dispatch: dispatch.clone(),
197 app_data: app_data.clone(),
198 is_test_mode: false,
199 };
200
201 let module_target = module_data.module.clone();
202 let module_version = module_data.version.clone();
203 let local_path = module_data.local_path.clone();
204 let settings = settings.clone();
205
206 thread::spawn(move || {
207 let result: Result<(), crate::loader::error::Error> =
208 load_module(setup, &module_target, &module_version, local_path, settings);
209
210 if let Err(err) = result {
211 error!("Runtime Error Load Module: {:?}", err)
212 }
213 });
214
215 debug!(
216 "Module {} loaded with name \"{}\" and version \"{}\"",
217 module_data.module, module_data.name, module_data.version
218 );
219 }
220
221 match setup_receive.await {
222 Ok(Some(sender)) => {
223 debug!("Module {} registered", module_data.name);
224 modules.register(module_data, sender);
225 }
226 Ok(None) => {
227 debug!("Module {} did not register", module_data.name);
228 }
229 Err(_) => {
230 return Err(RuntimeError::ModuleRegisterError);
231 }
232 }
233
234 if let Some((
235 name,
236 handler,
237 with,
238 app_data,
239 dispatch,
240 runtime_handle,
241 receiver,
242 )) = inline_worker
243 {
244 spawn_inline_module_worker(
245 name,
246 handler,
247 with,
248 app_data,
249 dispatch,
250 runtime_handle,
251 receiver,
252 );
253 }
254 }
255
256 if !unused_inline.is_empty() {
257 warn!(
258 "Inline modules not declared in pipeline: {}",
259 unused_inline.into_iter().collect::<Vec<_>>().join(", ")
260 );
261 }
262
263 Ok(modules)
264 }
265
266 async fn listener(
267 rx_main_package: channel::Receiver<Package>,
268 steps: Value,
269 modules: Modules,
270 settings: Settings,
271 default_context: Option<Context>,
272 ) -> Result<(), RuntimeError> {
273 let phlow = Arc::new({
274 match Phlow::try_from_value(&steps, Some(Arc::new(modules))) {
275 Ok(phlow) => phlow,
276 Err(err) => return Err(RuntimeError::FlowExecutionError(err.to_string())),
277 }
278 });
279 if let Some(controller) = phlow_engine::debug::debug_controller() {
280 controller.set_script(phlow.script()).await;
281 }
282
283 let start_step = if let Some(step_id) = settings.start_step.as_deref() {
284 match phlow.find_step_reference(step_id) {
285 Some(step_ref) => Some(step_ref),
286 None => {
287 return Err(RuntimeError::FlowExecutionError(format!(
288 "Step id '{}' not found",
289 step_id
290 )));
291 }
292 }
293 } else {
294 None
295 };
296
297 drop(steps);
298
299 let mut handles = Vec::new();
300 let default_context = default_context.clone();
301
302 for _i in 0..settings.package_consumer_count {
303 let rx_main_pkg = rx_main_package.clone();
304 let phlow = phlow.clone();
305 let default_context = default_context.clone();
306 let start_step = start_step.clone();
307
308 let handle = tokio::task::spawn_blocking(move || {
309 for mut main_package in rx_main_pkg {
310 let phlow = phlow.clone();
311 let parent = match main_package.span.clone() {
312 Some(span) => span,
313 None => {
314 error!("Span not found in main module");
315 continue;
316 }
317 };
318 let dispatch = match main_package.dispatch.clone() {
319 Some(dispatch) => dispatch,
320 None => {
321 error!("Dispatch not found in main module");
322 continue;
323 }
324 };
325
326 let mut context = {
327 let data = main_package.get_data().cloned().unwrap_or(Value::Null);
328 if let Some(mut context) = default_context.clone() {
329 context.set_main(data);
330 context
331 } else {
332 Context::from_main(data)
333 }
334 };
335 let start_step = start_step.clone();
336
337 tokio::task::block_in_place(move || {
338 dispatcher::with_default(&dispatch, || {
339 let _enter = parent.enter();
340 let rt = tokio::runtime::Handle::current();
341
342 rt.block_on(async {
343 let result = if let Some(step_ref) = start_step.clone() {
344 phlow.execute_from(&mut context, step_ref).await
345 } else {
346 phlow.execute(&mut context).await
347 };
348 match result {
349 Ok(result) => {
350 let result_value = result.unwrap_or(Value::Undefined);
351 main_package.send(result_value);
352 }
353 Err(err) => {
354 error!("Runtime Error Execute Steps: {:?}", err);
355 }
356 }
357 });
358 });
359 });
360 }
361 });
362
363 handles.push(handle);
364 }
365
366 join_all(handles).await;
367
368 Ok(())
369 }
370
371 pub async fn run(
372 loader: Loader,
373 dispatch: Dispatch,
374 settings: Settings,
375 ) -> Result<(), RuntimeError> {
376 let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
380
381 let var_payload_value = match &settings.var_payload {
382 Some(var_payload_str) => Some(parse_cli_value("var-payload", var_payload_str)?),
383 None => None,
384 };
385 let default_context = var_payload_value.as_ref().map(|payload| {
386 let mut context = Context::new();
387 context.add_step_payload(Some(payload.clone()));
388 context
389 });
390
391 let no_main = loader.main == -1 || settings.var_main.is_some();
392 let steps = loader.get_steps();
393 let inline_modules = InlineModules::default();
394 let modules = Self::load_modules(
395 loader,
396 dispatch.clone(),
397 settings.clone(),
398 tx_main_package.clone(),
399 &inline_modules,
400 )
401 .await?;
402
403 if no_main {
405 let span = tracing::span!(
407 tracing::Level::INFO,
408 "auto_start_steps",
409 otel.name = "phlow auto start"
410 );
411
412 let request_data = if let Some(var_main_str) = &settings.var_main {
414 Some(parse_cli_value("var-main", var_main_str)?)
416 } else {
417 None
418 };
419
420 let package = Package {
422 response: None,
423 request_data,
424 origin: 0,
425 span: Some(span),
426 dispatch: Some(dispatch.clone()),
427 };
428
429 if let Err(err) = tx_main_package.send(package) {
430 error!("Failed to send package: {:?}", err);
431 return Err(RuntimeError::FlowExecutionError(
432 "Failed to send package".to_string(),
433 ));
434 }
435
436 if settings.var_main.is_some() {
437 info!("Using --var-main to simulate main module output");
438 }
439 }
440
441 drop(tx_main_package);
442
443 #[cfg(target_env = "gnu")]
444 if settings.garbage_collection {
445 thread::spawn(move || {
446 loop {
447 thread::sleep(std::time::Duration::from_secs(
448 settings.garbage_collection_interval,
449 ));
450 force_memory_release(settings.min_allocated_memory);
451 }
452 });
453 }
454
455 info!("Phlow!");
456
457 Self::listener(rx_main_package, steps, modules, settings, default_context)
461 .await
462 .map_err(|err| {
463 error!("Runtime Error: {:?}", err);
464 err
465 })?;
466
467 Ok(())
468 }
469
470 pub async fn run_script(
471 tx_main_package: channel::Sender<Package>,
472 rx_main_package: channel::Receiver<Package>,
473 loader: Loader,
474 dispatch: Dispatch,
475 settings: Settings,
476 context: Context,
477 ) -> Result<(), RuntimeError> {
478 let inline_modules = InlineModules::default();
479 Self::run_script_with_modules(
480 tx_main_package,
481 rx_main_package,
482 loader,
483 dispatch,
484 settings,
485 context,
486 inline_modules,
487 )
488 .await
489 }
490
491 pub async fn run_script_with_modules(
492 tx_main_package: channel::Sender<Package>,
493 rx_main_package: channel::Receiver<Package>,
494 loader: Loader,
495 dispatch: Dispatch,
496 settings: Settings,
497 context: Context,
498 inline_modules: InlineModules,
499 ) -> Result<(), RuntimeError> {
500 let steps = loader.get_steps();
501 let context = if let Some(var_payload_str) = &settings.var_payload {
502 let payload = parse_cli_value("var-payload", var_payload_str)?;
503 context.clone_with_output(payload)
504 } else {
505 context
506 };
507
508 let modules = Self::load_modules(
509 loader,
510 dispatch.clone(),
511 settings.clone(),
512 tx_main_package.clone(),
513 &inline_modules,
514 )
515 .await?;
516
517 drop(tx_main_package);
518
519 Self::listener(rx_main_package, steps, modules, settings, Some(context))
520 .await
521 .map_err(|err| {
522 error!("Runtime Error: {:?}", err);
523 err
524 })?;
525
526 Ok(())
527 }
528}