1use crate::debug_server;
33use crate::loader::Loader;
34use crate::runtime::Runtime;
35use crate::runtime::RuntimeError;
36use crate::settings::Settings;
37use crossbeam::channel;
38use phlow_engine::Context;
39use phlow_sdk::otel::{OtelGuard, init_tracing_subscriber};
40use phlow_sdk::prelude::Value;
41use phlow_sdk::structs::Package;
42use phlow_sdk::{tracing, use_log};
43use std::fmt::{Display, Formatter};
44use std::path::PathBuf;
45use std::sync::Arc;
46
47#[derive(Debug)]
49pub enum PhlowRuntimeError {
50 MissingPipeline,
52 LoaderError(crate::loader::error::Error),
54 PackageSendError,
56 ResponseChannelClosed,
58 RuntimeError(RuntimeError),
60 RuntimeJoinError(tokio::task::JoinError),
62}
63
64impl Display for PhlowRuntimeError {
65 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66 match self {
67 PhlowRuntimeError::MissingPipeline => write!(f, "Pipeline not set"),
68 PhlowRuntimeError::LoaderError(err) => write!(f, "Loader error: {}", err),
69 PhlowRuntimeError::PackageSendError => write!(f, "Failed to send package"),
70 PhlowRuntimeError::ResponseChannelClosed => write!(f, "Response channel closed"),
71 PhlowRuntimeError::RuntimeError(err) => write!(f, "Runtime error: {}", err),
72 PhlowRuntimeError::RuntimeJoinError(err) => write!(f, "Runtime task error: {}", err),
73 }
74 }
75}
76
77impl std::error::Error for PhlowRuntimeError {}
78
79impl From<crate::loader::error::Error> for PhlowRuntimeError {
80 fn from(err: crate::loader::error::Error) -> Self {
81 PhlowRuntimeError::LoaderError(err)
82 }
83}
84
85impl From<RuntimeError> for PhlowRuntimeError {
86 fn from(err: RuntimeError) -> Self {
87 PhlowRuntimeError::RuntimeError(err)
88 }
89}
90
91pub struct PhlowRuntime {
93 pipeline: Option<Value>,
94 context: Option<Context>,
95 settings: Settings,
96 base_path: Option<PathBuf>,
97 dispatch: Option<tracing::Dispatch>,
98 prepared: Option<PreparedRuntime>,
99}
100
101pub struct PhlowBuilder {
105 pipeline: Option<Value>,
106 context: Option<Context>,
107 settings: Settings,
108 base_path: Option<PathBuf>,
109 dispatch: Option<tracing::Dispatch>,
110}
111
112impl Default for PhlowRuntime {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118impl PhlowRuntime {
119 pub fn new() -> Self {
123 let mut settings = Settings::for_runtime();
124 if settings.var_main.is_none() {
125 settings.var_main = Some("__phlow_runtime__".to_string());
126 }
127
128 Self {
129 pipeline: None,
130 context: None,
131 settings,
132 base_path: None,
133 dispatch: None,
134 prepared: None,
135 }
136 }
137
138 pub fn with_settings(settings: Settings) -> Self {
140 Self {
141 pipeline: None,
142 context: None,
143 settings,
144 base_path: None,
145 dispatch: None,
146 prepared: None,
147 }
148 }
149
150 pub fn set_pipeline(&mut self, pipeline: Value) -> &mut Self {
154 self.pipeline = Some(pipeline);
155 self.prepared = None;
156 self
157 }
158
159 pub fn set_context(&mut self, context: Context) -> &mut Self {
163 self.context = Some(context);
164 self.prepared = None;
165 self
166 }
167
168 pub fn set_settings(&mut self, settings: Settings) -> &mut Self {
172 self.settings = settings;
173 self.prepared = None;
174 self
175 }
176
177 pub fn set_base_path<P: Into<PathBuf>>(&mut self, base_path: P) -> &mut Self {
181 self.base_path = Some(base_path.into());
182 self.prepared = None;
183 self
184 }
185
186 pub fn set_dispatch(&mut self, dispatch: tracing::Dispatch) -> &mut Self {
190 self.dispatch = Some(dispatch);
191 self.prepared = None;
192 self
193 }
194
195 pub fn settings(&self) -> &Settings {
197 &self.settings
198 }
199
200 pub fn settings_mut(&mut self) -> &mut Settings {
204 self.prepared = None;
205 &mut self.settings
206 }
207
208 pub async fn build(&mut self) -> Result<(), PhlowRuntimeError> {
212 if self.prepared.is_some() {
213 return Ok(());
214 }
215
216 use_log!();
217
218 let pipeline = self
219 .pipeline
220 .as_ref()
221 .ok_or(PhlowRuntimeError::MissingPipeline)?;
222
223 let base_path = self.base_path.clone().unwrap_or_else(|| {
224 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("./"))
225 });
226
227 let mut loader = Loader::from_value(pipeline, Some(base_path.as_path()))?;
228
229 if self.settings.download {
230 loader
231 .download(&self.settings.default_package_repository_url)
232 .await?;
233 }
234
235 loader.update_info();
236
237 let mut guard: Option<OtelGuard> = None;
238 let dispatch = if let Some(dispatch) = self.dispatch.clone() {
239 dispatch
240 } else {
241 let next_guard = init_tracing_subscriber(loader.app_data.clone());
242 let dispatch = next_guard.dispatch.clone();
243 guard = Some(next_guard);
244 dispatch
245 };
246
247 let debug_enabled = std::env::var("PHLOW_DEBUG")
248 .map(|value| value.eq_ignore_ascii_case("true"))
249 .unwrap_or(false);
250 if debug_enabled {
251 let controller = Arc::new(phlow_engine::debug::DebugController::new());
252 match debug_server::spawn(controller.clone()).await {
253 Ok(()) => {
254 if phlow_engine::debug::set_debug_controller(controller).is_err() {
255 log::warn!("Debug controller already set");
256 }
257 log::info!("Phlow debug enabled");
258 }
259 Err(err) => {
260 log::error!("Failed to start debug server: {}", err);
261 }
262 }
263 }
264
265 let context = self.context.clone().unwrap_or_else(Context::new);
266 let request_data = context.get_main();
267 let context_for_runtime = context.clone();
268 let auto_start = self.settings.var_main.is_some()
269 || loader.main == -1
270 || context.get_main().is_some();
271
272 let app_name = loader
273 .app_data
274 .name
275 .clone()
276 .unwrap_or_else(|| "phlow runtime".to_string());
277
278 let settings = self.settings.clone();
279 let (tx_main_package, rx_main_package) = channel::unbounded::<Package>();
280 let tx_for_runtime = tx_main_package.clone();
281 let dispatch_for_runtime = dispatch.clone();
282
283 let runtime_handle = tokio::spawn(async move {
284 tracing::dispatcher::with_default(&dispatch_for_runtime, || {
285 Runtime::run_script(
286 tx_for_runtime,
287 rx_main_package,
288 loader,
289 dispatch_for_runtime.clone(),
290 settings,
291 context_for_runtime,
292 )
293 })
294 .await
295 });
296
297 self.prepared = Some(PreparedRuntime {
298 tx_main_package,
299 dispatch,
300 runtime_handle,
301 guard,
302 app_name,
303 request_data,
304 auto_start,
305 });
306
307 Ok(())
308 }
309
310 pub async fn run(&mut self) -> Result<Value, PhlowRuntimeError> {
318 self.build().await?;
319
320 let auto_start = match self.prepared.as_ref() {
321 Some(prepared) => prepared.auto_start,
322 None => return Err(PhlowRuntimeError::MissingPipeline),
323 };
324
325 if !auto_start {
326 self.shutdown().await?;
327 return Ok(Value::Undefined);
328 }
329
330 let (tx_main_package, dispatch, app_name, request_data) = match self.prepared.as_ref() {
331 Some(prepared) => (
332 prepared.tx_main_package.clone(),
333 prepared.dispatch.clone(),
334 prepared.app_name.clone(),
335 prepared.request_data.clone(),
336 ),
337 None => return Err(PhlowRuntimeError::MissingPipeline),
338 };
339
340 let (response_tx, response_rx) = tokio::sync::oneshot::channel::<Value>();
341 let package = tracing::dispatcher::with_default(&dispatch, || {
342 let span = tracing::span!(
343 tracing::Level::INFO,
344 "phlow_run",
345 otel.name = app_name.as_str()
346 );
347
348 Package {
349 response: Some(response_tx),
350 request_data,
351 origin: 0,
352 span: Some(span),
353 dispatch: Some(dispatch.clone()),
354 }
355 });
356
357 if tx_main_package.send(package).is_err() {
358 return Err(PhlowRuntimeError::PackageSendError);
359 }
360
361 let result = response_rx
362 .await
363 .map_err(|_| PhlowRuntimeError::ResponseChannelClosed)?;
364
365 Ok(result)
366 }
367
368 pub async fn shutdown(&mut self) -> Result<(), PhlowRuntimeError> {
373 let prepared = match self.prepared.take() {
374 Some(prepared) => prepared,
375 None => return Ok(()),
376 };
377
378 drop(prepared.tx_main_package);
379
380 let runtime_result = prepared
381 .runtime_handle
382 .await
383 .map_err(PhlowRuntimeError::RuntimeJoinError)?;
384 runtime_result?;
385
386 drop(prepared.guard);
387
388 Ok(())
389 }
390}
391
392impl PhlowBuilder {
393 pub fn new() -> Self {
397 let mut settings = Settings::for_runtime();
398 if settings.var_main.is_none() {
399 settings.var_main = Some("__phlow_runtime__".to_string());
400 }
401
402 Self {
403 pipeline: None,
404 context: None,
405 settings,
406 base_path: None,
407 dispatch: None,
408 }
409 }
410
411 pub fn with_settings(settings: Settings) -> Self {
413 Self {
414 pipeline: None,
415 context: None,
416 settings,
417 base_path: None,
418 dispatch: None,
419 }
420 }
421
422 pub fn set_pipeline(mut self, pipeline: Value) -> Self {
426 self.pipeline = Some(pipeline);
427 self
428 }
429
430 pub fn set_context(mut self, context: Context) -> Self {
434 self.context = Some(context);
435 self
436 }
437
438 pub fn set_settings(mut self, settings: Settings) -> Self {
442 self.settings = settings;
443 self
444 }
445
446 pub fn set_base_path<P: Into<PathBuf>>(mut self, base_path: P) -> Self {
450 self.base_path = Some(base_path.into());
451 self
452 }
453
454 pub fn set_dispatch(mut self, dispatch: tracing::Dispatch) -> Self {
458 self.dispatch = Some(dispatch);
459 self
460 }
461
462 pub fn settings(&self) -> &Settings {
464 &self.settings
465 }
466
467 pub fn settings_mut(&mut self) -> &mut Settings {
469 &mut self.settings
470 }
471
472 pub async fn build(mut self) -> Result<PhlowRuntime, PhlowRuntimeError> {
476 let mut runtime = PhlowRuntime::with_settings(self.settings);
477
478 if let Some(pipeline) = self.pipeline.take() {
479 runtime.set_pipeline(pipeline);
480 }
481
482 if let Some(context) = self.context.take() {
483 runtime.set_context(context);
484 }
485
486 if let Some(base_path) = self.base_path.take() {
487 runtime.set_base_path(base_path);
488 }
489
490 if let Some(dispatch) = self.dispatch.take() {
491 runtime.set_dispatch(dispatch);
492 }
493
494 runtime.build().await?;
495 Ok(runtime)
496 }
497}
498
499impl Default for PhlowBuilder {
500 fn default() -> Self {
501 Self::new()
502 }
503}
504
505struct PreparedRuntime {
506 tx_main_package: channel::Sender<Package>,
507 dispatch: tracing::Dispatch,
508 runtime_handle: tokio::task::JoinHandle<Result<(), RuntimeError>>,
509 guard: Option<OtelGuard>,
510 app_name: String,
511 request_data: Option<Value>,
512 auto_start: bool,
513}