1use crate::debug_server;
88use crate::inline_module::{InlineModules, PhlowModule};
89use crate::loader::Loader;
90use crate::runtime::Runtime;
91use crate::runtime::RuntimeError;
92use crate::settings::Settings;
93use crossbeam::channel;
94use phlow_engine::Context;
95use phlow_sdk::otel::{OtelGuard, init_tracing_subscriber};
96use phlow_sdk::prelude::Value;
97use phlow_sdk::structs::Package;
98use phlow_sdk::{tracing, use_log};
99use std::fmt::{Display, Formatter};
100use std::path::PathBuf;
101use std::sync::Arc;
102
103#[derive(Debug)]
105pub enum PhlowRuntimeError {
106 MissingPipeline,
108 LoaderError(crate::loader::error::Error),
110 PackageSendError,
112 ResponseChannelClosed,
114 RuntimeError(RuntimeError),
116 RuntimeJoinError(tokio::task::JoinError),
118}
119
120impl Display for PhlowRuntimeError {
121 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122 match self {
123 PhlowRuntimeError::MissingPipeline => write!(f, "Pipeline not set"),
124 PhlowRuntimeError::LoaderError(err) => write!(f, "Loader error: {}", err),
125 PhlowRuntimeError::PackageSendError => write!(f, "Failed to send package"),
126 PhlowRuntimeError::ResponseChannelClosed => write!(f, "Response channel closed"),
127 PhlowRuntimeError::RuntimeError(err) => write!(f, "Runtime error: {}", err),
128 PhlowRuntimeError::RuntimeJoinError(err) => write!(f, "Runtime task error: {}", err),
129 }
130 }
131}
132
133impl std::error::Error for PhlowRuntimeError {}
134
135impl From<crate::loader::error::Error> for PhlowRuntimeError {
136 fn from(err: crate::loader::error::Error) -> Self {
137 PhlowRuntimeError::LoaderError(err)
138 }
139}
140
141impl From<RuntimeError> for PhlowRuntimeError {
142 fn from(err: RuntimeError) -> Self {
143 PhlowRuntimeError::RuntimeError(err)
144 }
145}
146
147pub struct PhlowRuntime {
149 pipeline: Option<Value>,
150 context: Option<Context>,
151 settings: Settings,
152 base_path: Option<PathBuf>,
153 dispatch: Option<tracing::Dispatch>,
154 inline_modules: InlineModules,
155 prepared: Option<PreparedRuntime>,
156}
157
158pub struct PhlowBuilder {
162 pipeline: Option<Value>,
163 context: Option<Context>,
164 settings: Settings,
165 base_path: Option<PathBuf>,
166 dispatch: Option<tracing::Dispatch>,
167 inline_modules: InlineModules,
168}
169
170impl Default for PhlowRuntime {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176impl PhlowRuntime {
177 pub fn new() -> Self {
181 let mut settings = Settings::for_runtime();
182 if settings.var_main.is_none() {
183 settings.var_main = Some("__phlow_runtime__".to_string());
184 }
185
186 Self {
187 pipeline: None,
188 context: None,
189 settings,
190 base_path: None,
191 dispatch: None,
192 inline_modules: InlineModules::default(),
193 prepared: None,
194 }
195 }
196
197 pub fn with_settings(settings: Settings) -> Self {
199 Self {
200 pipeline: None,
201 context: None,
202 settings,
203 base_path: None,
204 dispatch: None,
205 inline_modules: InlineModules::default(),
206 prepared: None,
207 }
208 }
209
210 pub fn set_pipeline(&mut self, pipeline: Value) -> &mut Self {
214 self.pipeline = Some(pipeline);
215 self.prepared = None;
216 self
217 }
218
219 pub fn set_context(&mut self, context: Context) -> &mut Self {
223 self.context = Some(context);
224 self.prepared = None;
225 self
226 }
227
228 pub fn set_settings(&mut self, settings: Settings) -> &mut Self {
232 self.settings = settings;
233 self.prepared = None;
234 self
235 }
236
237 pub fn set_base_path<P: Into<PathBuf>>(&mut self, base_path: P) -> &mut Self {
241 self.base_path = Some(base_path.into());
242 self.prepared = None;
243 self
244 }
245
246 pub fn set_dispatch(&mut self, dispatch: tracing::Dispatch) -> &mut Self {
250 self.dispatch = Some(dispatch);
251 self.prepared = None;
252 self
253 }
254
255 pub fn set_module<S: Into<String>>(&mut self, name: S, module: PhlowModule) -> &mut Self {
262 self.inline_modules.insert(name.into(), module);
263 self.prepared = None;
264 self
265 }
266
267 pub fn settings(&self) -> &Settings {
269 &self.settings
270 }
271
272 pub fn settings_mut(&mut self) -> &mut Settings {
276 self.prepared = None;
277 &mut self.settings
278 }
279
280 pub async fn build(&mut self) -> Result<(), PhlowRuntimeError> {
284 if self.prepared.is_some() {
285 return Ok(());
286 }
287
288 use_log!();
289
290 let pipeline = self
291 .pipeline
292 .as_ref()
293 .ok_or(PhlowRuntimeError::MissingPipeline)?;
294
295 let base_path = self.base_path.clone().unwrap_or_else(|| {
296 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
297 });
298
299 let mut loader = Loader::from_value(pipeline, Some(base_path.as_path()))?;
300
301 if self.settings.download {
302 loader
303 .download(&self.settings.default_package_repository_url)
304 .await?;
305 }
306
307 loader.update_info();
308
309 let mut guard: Option<OtelGuard> = None;
310 let dispatch = if let Some(dispatch) = self.dispatch.clone() {
311 dispatch
312 } else {
313 let next_guard = init_tracing_subscriber(loader.app_data.clone());
314 let dispatch = next_guard.dispatch.clone();
315 guard = Some(next_guard);
316 dispatch
317 };
318
319 let debug_enabled = std::env::var("PHLOW_DEBUG")
320 .map(|value| value.eq_ignore_ascii_case("true"))
321 .unwrap_or(false);
322 if debug_enabled {
323 let controller = Arc::new(phlow_engine::debug::DebugController::new());
324 match debug_server::spawn(controller.clone()).await {
325 Ok(()) => {
326 if phlow_engine::debug::set_debug_controller(controller).is_err() {
327 log::warn!("Debug controller already set");
328 }
329 log::info!("Phlow debug enabled");
330 }
331 Err(err) => {
332 log::error!("Failed to start debug server: {}", err);
333 }
334 }
335 }
336
337 let context = self.context.clone().unwrap_or_else(Context::new);
338 let request_data = context.get_main();
339 let context_for_runtime = context.clone();
340 let auto_start = self.settings.var_main.is_some()
341 || loader.main == -1
342 || context.get_main().is_some();
343
344 let app_name = loader
345 .app_data
346 .name
347 .clone()
348 .unwrap_or_else(|| "phlow runtime".to_string());
349
350 let settings = self.settings.clone();
351 let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
352 let tx_for_runtime = tx_main_package.clone();
353 let dispatch_for_runtime = dispatch.clone();
354 let inline_modules = self.inline_modules.clone();
355
356 let runtime_handle = tokio::spawn(async move {
357 tracing::dispatcher::with_default(&dispatch_for_runtime, || {
358 Runtime::run_script_with_modules(
359 tx_for_runtime,
360 rx_main_package,
361 loader,
362 dispatch_for_runtime.clone(),
363 settings,
364 context_for_runtime,
365 inline_modules,
366 )
367 })
368 .await
369 });
370
371 self.prepared = Some(PreparedRuntime {
372 tx_main_package,
373 dispatch,
374 runtime_handle,
375 guard,
376 app_name,
377 request_data,
378 auto_start,
379 });
380
381 Ok(())
382 }
383
384 pub async fn run(&mut self) -> Result<Value, PhlowRuntimeError> {
392 self.build().await?;
393
394 let auto_start = match self.prepared.as_ref() {
395 Some(prepared) => prepared.auto_start,
396 None => return Err(PhlowRuntimeError::MissingPipeline),
397 };
398
399 if !auto_start {
400 self.shutdown().await?;
401 return Ok(Value::Undefined);
402 }
403
404 let (tx_main_package, dispatch, app_name, request_data) = match self.prepared.as_ref() {
405 Some(prepared) => (
406 prepared.tx_main_package.clone(),
407 prepared.dispatch.clone(),
408 prepared.app_name.clone(),
409 prepared.request_data.clone(),
410 ),
411 None => return Err(PhlowRuntimeError::MissingPipeline),
412 };
413
414 let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();
415 let package = tracing::dispatcher::with_default(&dispatch, || {
416 let span = tracing::span!(
417 tracing::Level::INFO,
418 "phlow_run",
419 otel.name = app_name.as_str()
420 );
421
422 Package {
423 response: Some(response_tx),
424 request_data,
425 origin: 0,
426 span: Some(span),
427 dispatch: Some(dispatch.clone()),
428 }
429 });
430
431 if tx_main_package.send(package).is_err() {
432 return Err(PhlowRuntimeError::PackageSendError);
433 }
434
435 let result = response_rx
436 .await
437 .map_err(|_| PhlowRuntimeError::ResponseChannelClosed)?;
438
439 Ok(result)
440 }
441
442 pub async fn shutdown(&mut self) -> Result<(), PhlowRuntimeError> {
447 let prepared = match self.prepared.take() {
448 Some(prepared) => prepared,
449 None => return Ok(()),
450 };
451
452 drop(prepared.tx_main_package);
453
454 let runtime_result = prepared
455 .runtime_handle
456 .await
457 .map_err(PhlowRuntimeError::RuntimeJoinError)?;
458 runtime_result?;
459
460 drop(prepared.guard);
461
462 Ok(())
463 }
464}
465
466impl PhlowBuilder {
467 pub fn new() -> Self {
471 let mut settings = Settings::for_runtime();
472 if settings.var_main.is_none() {
473 settings.var_main = Some("__phlow_runtime__".to_string());
474 }
475
476 Self {
477 pipeline: None,
478 context: None,
479 settings,
480 base_path: None,
481 dispatch: None,
482 inline_modules: InlineModules::default(),
483 }
484 }
485
486 pub fn with_settings(settings: Settings) -> Self {
488 Self {
489 pipeline: None,
490 context: None,
491 settings,
492 base_path: None,
493 dispatch: None,
494 inline_modules: InlineModules::default(),
495 }
496 }
497
498 pub fn set_pipeline(mut self, pipeline: Value) -> Self {
502 self.pipeline = Some(pipeline);
503 self
504 }
505
506 pub fn set_context(mut self, context: Context) -> Self {
510 self.context = Some(context);
511 self
512 }
513
514 pub fn set_settings(mut self, settings: Settings) -> Self {
518 self.settings = settings;
519 self
520 }
521
522 pub fn set_base_path<P: Into<PathBuf>>(mut self, base_path: P) -> Self {
526 self.base_path = Some(base_path.into());
527 self
528 }
529
530 pub fn set_dispatch(mut self, dispatch: tracing::Dispatch) -> Self {
534 self.dispatch = Some(dispatch);
535 self
536 }
537
538 pub fn set_module<S: Into<String>>(mut self, name: S, module: PhlowModule) -> Self {
545 self.inline_modules.insert(name.into(), module);
546 self
547 }
548
549 pub fn settings(&self) -> &Settings {
551 &self.settings
552 }
553
554 pub fn settings_mut(&mut self) -> &mut Settings {
556 &mut self.settings
557 }
558
559 pub async fn build(mut self) -> Result<PhlowRuntime, PhlowRuntimeError> {
563 let mut runtime = PhlowRuntime::with_settings(self.settings);
564 runtime.inline_modules = self.inline_modules;
565
566 if let Some(pipeline) = self.pipeline.take() {
567 runtime.set_pipeline(pipeline);
568 }
569
570 if let Some(context) = self.context.take() {
571 runtime.set_context(context);
572 }
573
574 if let Some(base_path) = self.base_path.take() {
575 runtime.set_base_path(base_path);
576 }
577
578 if let Some(dispatch) = self.dispatch.take() {
579 runtime.set_dispatch(dispatch);
580 }
581
582 runtime.build().await?;
583 Ok(runtime)
584 }
585}
586
587impl Default for PhlowBuilder {
588 fn default() -> Self {
589 Self::new()
590 }
591}
592
593struct PreparedRuntime {
594 tx_main_package: channel::Sender<Package>,
595 dispatch: tracing::Dispatch,
596 runtime_handle: tokio::task::JoinHandle<Result<(), RuntimeError>>,
597 guard: Option<OtelGuard>,
598 app_name: String,
599 request_data: Option<Value>,
600 auto_start: bool,
601}