1use asupersync::runtime::{Runtime, RuntimeBuilder};
29use pureflow_core::{
30 CancellationError, CancellationHandle, PureflowError, NodeExecutor, PortsIn, PortsOut, Result,
31 context::{CancellationState, NodeContext},
32 lifecycle::{LifecycleEvent, LifecycleEventKind, LifecycleHook, NoopLifecycleHook},
33 metadata::{ErrorMetadataRecord, MetadataRecord, MetadataSink, NoopMetadataSink},
34};
35use std::future::Future;
36use std::sync::Arc;
37
38pub struct AsupersyncRuntime {
40 runtime: Runtime,
41}
42
43impl AsupersyncRuntime {
44 pub fn new() -> Result<Self> {
50 Self::from_builder(RuntimeBuilder::new())
51 }
52
53 fn from_builder(builder: RuntimeBuilder) -> Result<Self> {
54 let runtime: Runtime = match builder.build() {
55 Ok(runtime) => runtime,
56 Err(err) => {
57 return Err(PureflowError::execution(format!(
58 "failed to build asupersync runtime: {err}"
59 )));
60 }
61 };
62
63 Ok(Self { runtime })
64 }
65
66 pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
68 self.runtime.block_on(future)
69 }
70
71 #[cfg(test)]
72 fn deterministic_for_tests() -> Result<Self> {
73 Self::from_builder(RuntimeBuilder::current_thread().poll_budget(1))
74 }
75
76 #[must_use]
78 pub fn cancellation_handle() -> CancellationHandle {
79 CancellationHandle::new()
80 }
81
82 pub fn run_node<E: NodeExecutor + ?Sized>(
89 &self,
90 node: &E,
91 ctx: NodeContext,
92 inputs: PortsIn,
93 outputs: PortsOut,
94 ) -> Result<()> {
95 if let Some(err) = cancellation_error(&ctx) {
96 return Err(err);
97 }
98
99 self.runtime.block_on(run_node(node, ctx, inputs, outputs))
100 }
101
102 pub fn run_node_with_cancellation_handle<E: NodeExecutor + ?Sized>(
109 &self,
110 node: &E,
111 ctx: NodeContext,
112 inputs: PortsIn,
113 outputs: PortsOut,
114 cancellation: &CancellationHandle,
115 ) -> Result<()> {
116 let ctx: NodeContext = ctx.with_cancellation_token(cancellation.token());
117 if let Some(err) = cancellation_error(&ctx) {
118 return Err(err);
119 }
120
121 self.runtime.block_on(run_node(node, ctx, inputs, outputs))
122 }
123
124 pub fn run_node_with_metadata_sink<E, M>(
132 &self,
133 node: &E,
134 ctx: NodeContext,
135 inputs: PortsIn,
136 outputs: PortsOut,
137 metadata_sink: Arc<M>,
138 ) -> Result<()>
139 where
140 E: NodeExecutor + ?Sized,
141 M: MetadataSink + 'static,
142 {
143 if let Some(err) = cancellation_error(&ctx) {
144 return Err(err);
145 }
146
147 self.runtime.block_on(run_node_with_metadata_sink(
148 node,
149 ctx,
150 inputs,
151 outputs,
152 metadata_sink,
153 ))
154 }
155}
156
157pub async fn run_node<E: NodeExecutor + ?Sized>(
164 node: &E,
165 ctx: NodeContext,
166 inputs: PortsIn,
167 outputs: PortsOut,
168) -> Result<()> {
169 run_node_with_observers(
170 node,
171 ctx,
172 inputs,
173 outputs,
174 &NoopLifecycleHook,
175 Arc::new(NoopMetadataSink),
176 )
177 .await
178}
179
180pub async fn run_node_with_hook<E, H>(
187 node: &E,
188 ctx: NodeContext,
189 inputs: PortsIn,
190 outputs: PortsOut,
191 hook: &H,
192) -> Result<()>
193where
194 E: NodeExecutor + ?Sized,
195 H: LifecycleHook + ?Sized,
196{
197 run_node_with_observers(node, ctx, inputs, outputs, hook, Arc::new(NoopMetadataSink)).await
198}
199
200pub async fn run_node_with_metadata_sink<E, M>(
207 node: &E,
208 ctx: NodeContext,
209 inputs: PortsIn,
210 outputs: PortsOut,
211 metadata_sink: Arc<M>,
212) -> Result<()>
213where
214 E: NodeExecutor + ?Sized,
215 M: MetadataSink + 'static,
216{
217 run_node_with_observers(
218 node,
219 ctx,
220 inputs,
221 outputs,
222 &NoopLifecycleHook,
223 metadata_sink,
224 )
225 .await
226}
227
228pub async fn run_node_with_observers<E, H, M>(
235 node: &E,
236 ctx: NodeContext,
237 inputs: PortsIn,
238 outputs: PortsOut,
239 hook: &H,
240 metadata_sink: Arc<M>,
241) -> Result<()>
242where
243 E: NodeExecutor + ?Sized,
244 H: LifecycleHook + ?Sized,
245 M: MetadataSink + 'static,
246{
247 let metadata_sink: Arc<dyn MetadataSink + Send + Sync> = metadata_sink.clone();
248 let inputs: PortsIn = inputs
249 .with_metadata_sink(metadata_sink.clone())
250 .with_node_context(ctx.clone());
251 let outputs: PortsOut = outputs
252 .with_metadata_sink(metadata_sink.clone())
253 .with_node_context(ctx.clone());
254 observe_lifecycle(
255 hook,
256 metadata_sink.as_ref(),
257 LifecycleEventKind::NodeStarted,
258 ctx.clone(),
259 )?;
260
261 let result: Result<()> = node.run(ctx.clone(), inputs, outputs).await;
262 let terminal_observation: Result<()> = match &result {
263 Ok(()) => observe_lifecycle(
264 hook,
265 metadata_sink.as_ref(),
266 LifecycleEventKind::NodeCompleted,
267 ctx,
268 ),
269 Err(err) => {
270 let error_observation: Result<()> =
271 observe_node_error(metadata_sink.as_ref(), &ctx, err.clone());
272 let lifecycle_kind: LifecycleEventKind = if matches!(err, PureflowError::Cancellation(_))
273 {
274 LifecycleEventKind::NodeCancelled
275 } else {
276 LifecycleEventKind::NodeFailed
277 };
278 let lifecycle_observation: Result<()> =
279 observe_lifecycle(hook, metadata_sink.as_ref(), lifecycle_kind, ctx);
280 error_observation.and(lifecycle_observation)
281 }
282 };
283
284 match (result, terminal_observation) {
285 (Ok(()), Ok(())) => Ok(()),
286 (Ok(()), Err(err)) | (Err(err), _) => Err(err),
287 }
288}
289
290fn observe_node_error<M>(metadata_sink: &M, ctx: &NodeContext, err: PureflowError) -> Result<()>
291where
292 M: MetadataSink + ?Sized,
293{
294 let record: MetadataRecord = MetadataRecord::Error(ErrorMetadataRecord::node_failed(ctx, err));
295 emit_metadata_trace(&record);
296 metadata_sink.record(&record)
297}
298
299fn observe_lifecycle<H, M>(
300 hook: &H,
301 metadata_sink: &M,
302 kind: LifecycleEventKind,
303 ctx: NodeContext,
304) -> Result<()>
305where
306 H: LifecycleHook + ?Sized,
307 M: MetadataSink + ?Sized,
308{
309 let event: LifecycleEvent = LifecycleEvent::new(kind, ctx);
310 emit_lifecycle_trace(&event);
311 let record: MetadataRecord = MetadataRecord::Lifecycle(event.clone());
312 emit_metadata_trace(&record);
313 metadata_sink.record(&record)?;
314 hook.observe(&event)
315}
316
317fn cancellation_error(ctx: &NodeContext) -> Option<PureflowError> {
318 match ctx.cancellation() {
319 CancellationState::Active => None,
320 CancellationState::Requested(request) => {
321 emit_cancellation_trace(ctx, request.reason());
322 Some(PureflowError::from(CancellationError::new(request.reason())))
323 }
324 }
325}
326
327#[cfg(feature = "tracing")]
328fn emit_lifecycle_trace(event: &LifecycleEvent) {
329 let ctx: &NodeContext = event.context();
330 tracing::info!(
331 target: "pureflow.runtime.lifecycle",
332 kind = lifecycle_event_kind_label(event.kind()),
333 workflow_id = %ctx.workflow_id(),
334 node_id = %ctx.node_id(),
335 execution_id = %ctx.execution().execution_id(),
336 attempt = ctx.execution().attempt().get(),
337 "runtime lifecycle event"
338 );
339}
340
341#[cfg(not(feature = "tracing"))]
342const fn emit_lifecycle_trace(_event: &LifecycleEvent) {}
343
344#[cfg(feature = "tracing")]
345fn emit_metadata_trace(record: &MetadataRecord) {
346 tracing::debug!(
347 target: "pureflow.runtime.metadata",
348 record_type = metadata_record_kind_label(record),
349 "runtime metadata record emitted"
350 );
351}
352
353#[cfg(not(feature = "tracing"))]
354const fn emit_metadata_trace(_record: &MetadataRecord) {}
355
356#[cfg(feature = "tracing")]
357fn emit_cancellation_trace(ctx: &NodeContext, reason: &str) {
358 tracing::warn!(
359 target: "pureflow.runtime.cancellation",
360 workflow_id = %ctx.workflow_id(),
361 node_id = %ctx.node_id(),
362 execution_id = %ctx.execution().execution_id(),
363 attempt = ctx.execution().attempt().get(),
364 reason,
365 "runtime cancellation observed"
366 );
367}
368
369#[cfg(not(feature = "tracing"))]
370const fn emit_cancellation_trace(_ctx: &NodeContext, _reason: &str) {}
371
372#[cfg(feature = "tracing")]
373const fn lifecycle_event_kind_label(kind: LifecycleEventKind) -> &'static str {
374 match kind {
375 LifecycleEventKind::NodeScheduled => "node_scheduled",
376 LifecycleEventKind::NodeStarted => "node_started",
377 LifecycleEventKind::NodeCompleted => "node_completed",
378 LifecycleEventKind::NodeFailed => "node_failed",
379 LifecycleEventKind::NodeCancelled => "node_cancelled",
380 }
381}
382
383#[cfg(feature = "tracing")]
384const fn metadata_record_kind_label(record: &MetadataRecord) -> &'static str {
385 match record {
386 MetadataRecord::ExecutionContext(_) => "execution_context",
387 MetadataRecord::Lifecycle(_) => "lifecycle",
388 MetadataRecord::Message(_) => "message",
389 MetadataRecord::QueuePressure(_) => "queue_pressure",
390 MetadataRecord::Error(_) => "error",
391 MetadataRecord::ExternalEffect(_) => "external_effect",
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398 use std::future::{Future, Ready, ready};
399 use std::pin::Pin;
400 use std::sync::Mutex;
401
402 use pureflow_core::{
403 CancellationError, PureflowError, ErrorMetadataKind, LifecycleError, MetadataError,
404 context::CancellationRequest, lifecycle::LifecycleEventKind,
405 };
406 use pureflow_test_kit::{
407 FailingExecutor, RecordingExecutor, execution_metadata, node_id, workflow_id,
408 };
409 use futures::executor::block_on;
410
411 #[derive(Debug, Default)]
412 struct RecordingHook {
413 events: Mutex<Vec<LifecycleEventKind>>,
414 }
415
416 impl RecordingHook {
417 fn recorded(&self) -> Vec<LifecycleEventKind> {
418 self.events
419 .lock()
420 .expect("recording hook lock should not be poisoned")
421 .clone()
422 }
423 }
424
425 impl LifecycleHook for RecordingHook {
426 fn observe(&self, event: &LifecycleEvent) -> Result<()> {
427 self.events
428 .lock()
429 .expect("recording hook lock should not be poisoned")
430 .push(event.kind());
431 Ok(())
432 }
433 }
434
435 #[derive(Debug)]
436 struct FailingHook;
437
438 impl LifecycleHook for FailingHook {
439 fn observe(&self, _event: &LifecycleEvent) -> Result<()> {
440 Err(PureflowError::from(LifecycleError::new("hook failed")))
441 }
442 }
443
444 #[derive(Debug)]
445 struct CancelledExecutor;
446
447 impl NodeExecutor for CancelledExecutor {
448 type RunFuture<'a> = Ready<Result<()>>;
449
450 fn run(
451 &self,
452 _ctx: NodeContext,
453 _inputs: PortsIn,
454 _outputs: PortsOut,
455 ) -> Self::RunFuture<'_> {
456 ready(Err(PureflowError::cancelled("planned shutdown")))
457 }
458 }
459
460 #[derive(Debug, Default)]
461 struct RecordingMetadataSink {
462 events: Mutex<Vec<LifecycleEventKind>>,
463 }
464
465 impl RecordingMetadataSink {
466 fn recorded(&self) -> Vec<LifecycleEventKind> {
467 self.events
468 .lock()
469 .expect("metadata sink lock should not be poisoned")
470 .clone()
471 }
472 }
473
474 impl MetadataSink for RecordingMetadataSink {
475 fn record(&self, record: &MetadataRecord) -> Result<()> {
476 if let MetadataRecord::Lifecycle(event) = record {
477 self.events
478 .lock()
479 .expect("metadata sink lock should not be poisoned")
480 .push(event.kind());
481 }
482 Ok(())
483 }
484 }
485
486 #[derive(Debug, Default)]
487 struct RecordingAllMetadataSink {
488 records: Mutex<Vec<MetadataRecord>>,
489 }
490
491 impl RecordingAllMetadataSink {
492 fn records(&self) -> Vec<MetadataRecord> {
493 self.records
494 .lock()
495 .expect("metadata sink lock should not be poisoned")
496 .clone()
497 }
498 }
499
500 impl MetadataSink for RecordingAllMetadataSink {
501 fn record(&self, record: &MetadataRecord) -> Result<()> {
502 self.records
503 .lock()
504 .expect("metadata sink lock should not be poisoned")
505 .push(record.clone());
506 Ok(())
507 }
508 }
509
510 #[derive(Debug)]
511 struct FailingMetadataSink;
512
513 impl MetadataSink for FailingMetadataSink {
514 fn record(&self, _record: &MetadataRecord) -> Result<()> {
515 Err(PureflowError::from(MetadataError::new(
516 "metadata sink failed",
517 )))
518 }
519 }
520
521 #[derive(Debug)]
522 struct FailingOnNodeFailedMetadataSink;
523
524 impl MetadataSink for FailingOnNodeFailedMetadataSink {
525 fn record(&self, record: &MetadataRecord) -> Result<()> {
526 if matches!(
527 record,
528 MetadataRecord::Lifecycle(event)
529 if event.kind() == LifecycleEventKind::NodeFailed
530 ) {
531 return Err(PureflowError::from(MetadataError::new(
532 "terminal metadata failed",
533 )));
534 }
535 Ok(())
536 }
537 }
538
539 #[derive(Debug)]
540 struct CancellingExecutor {
541 handle: CancellationHandle,
542 observations: Mutex<Vec<bool>>,
543 }
544
545 impl CancellingExecutor {
546 fn new(handle: CancellationHandle) -> Self {
547 Self {
548 handle,
549 observations: Mutex::new(Vec::new()),
550 }
551 }
552
553 fn record(&self, value: bool) {
554 self.observations
555 .lock()
556 .expect("cancelling executor observations lock should not be poisoned")
557 .push(value);
558 }
559
560 fn observations(&self) -> Vec<bool> {
561 self.observations
562 .lock()
563 .expect("cancelling executor observations lock should not be poisoned")
564 .clone()
565 }
566 }
567
568 impl NodeExecutor for CancellingExecutor {
569 type RunFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
570
571 fn run(
572 &self,
573 ctx: NodeContext,
574 _inputs: PortsIn,
575 _outputs: PortsOut,
576 ) -> Self::RunFuture<'_> {
577 Box::pin(async move {
578 self.record(ctx.is_cancelled());
579 let _first_request: bool = self
580 .handle
581 .cancel(CancellationRequest::new("runtime supervisor stopped node"));
582 self.record(ctx.is_cancelled());
583 Ok(())
584 })
585 }
586 }
587
588 fn context() -> NodeContext {
589 NodeContext::new(
590 workflow_id("flow"),
591 node_id("node"),
592 execution_metadata("run-1"),
593 )
594 }
595
596 fn deterministic_runtime() -> AsupersyncRuntime {
597 AsupersyncRuntime::deterministic_for_tests().expect("deterministic runtime should build")
598 }
599
600 #[test]
601 fn deterministic_runtime_for_tests_uses_current_thread_config() {
602 let runtime: AsupersyncRuntime = deterministic_runtime();
603
604 assert_eq!(runtime.runtime.config().worker_threads, 1);
605 assert_eq!(runtime.runtime.config().poll_budget, 1);
606 }
607
608 #[test]
609 fn production_runtime_builder_defaults_remain_separate_from_test_runtime() {
610 let production: AsupersyncRuntime =
611 AsupersyncRuntime::new().expect("production runtime should build");
612 let deterministic: AsupersyncRuntime = deterministic_runtime();
613
614 assert_eq!(production.runtime.config().poll_budget, 128);
615 assert_eq!(deterministic.runtime.config().worker_threads, 1);
616 assert_eq!(deterministic.runtime.config().poll_budget, 1);
617 }
618
619 #[test]
620 fn asupersync_runtime_runs_one_node() {
621 let runtime: AsupersyncRuntime = deterministic_runtime();
622 let executor: RecordingExecutor = RecordingExecutor::default();
623
624 runtime
625 .run_node(
626 &executor,
627 context(),
628 PortsIn::default(),
629 PortsOut::default(),
630 )
631 .expect("execution should succeed");
632
633 assert_eq!(executor.visited_node_names(), vec!["node"]);
634 }
635
636 #[test]
637 fn asupersync_runtime_preserves_executor_failures() {
638 let runtime: AsupersyncRuntime = deterministic_runtime();
639 let executor: FailingExecutor = FailingExecutor::execution("boom");
640
641 let err: PureflowError = runtime
642 .run_node(
643 &executor,
644 context(),
645 PortsIn::default(),
646 PortsOut::default(),
647 )
648 .expect_err("execution should fail");
649
650 assert_eq!(err, PureflowError::execution("boom"));
651 }
652
653 #[test]
654 fn asupersync_runtime_rejects_pre_cancelled_contexts() {
655 let runtime: AsupersyncRuntime = deterministic_runtime();
656 let executor: RecordingExecutor = RecordingExecutor::default();
657 let ctx: NodeContext =
658 context().with_cancellation(CancellationRequest::new("shutdown requested"));
659
660 let err: PureflowError = runtime
661 .run_node(&executor, ctx, PortsIn::default(), PortsOut::default())
662 .expect_err("cancelled execution should not run");
663
664 assert_eq!(
665 err,
666 PureflowError::from(CancellationError::new("shutdown requested"))
667 );
668 assert!(executor.visited_contexts().is_empty());
669 }
670
671 #[test]
672 fn asupersync_runtime_cancellation_handle_is_visible_inside_running_node() {
673 let runtime: AsupersyncRuntime = deterministic_runtime();
674 let handle: CancellationHandle = AsupersyncRuntime::cancellation_handle();
675 let executor: CancellingExecutor = CancellingExecutor::new(handle.clone());
676
677 runtime
678 .run_node_with_cancellation_handle(
679 &executor,
680 context(),
681 PortsIn::default(),
682 PortsOut::default(),
683 &handle,
684 )
685 .expect("execution should succeed");
686
687 assert_eq!(executor.observations(), vec![false, true]);
688 assert!(handle.is_cancelled());
689 }
690
691 #[test]
692 fn asupersync_runtime_rejects_child_context_after_shared_cancellation() {
693 let runtime: AsupersyncRuntime = deterministic_runtime();
694 let handle: CancellationHandle = AsupersyncRuntime::cancellation_handle();
695 let canceller: CancellingExecutor = CancellingExecutor::new(handle.clone());
696 let child: RecordingExecutor = RecordingExecutor::default();
697
698 runtime
699 .run_node_with_cancellation_handle(
700 &canceller,
701 context(),
702 PortsIn::default(),
703 PortsOut::default(),
704 &handle,
705 )
706 .expect("first execution should request cancellation");
707 let err: PureflowError = runtime
708 .run_node_with_cancellation_handle(
709 &child,
710 context(),
711 PortsIn::default(),
712 PortsOut::default(),
713 &handle,
714 )
715 .expect_err("shared cancellation should reject child execution");
716
717 assert_eq!(
718 err,
719 PureflowError::from(CancellationError::new("runtime supervisor stopped node"))
720 );
721 assert!(child.visited_contexts().is_empty());
722 }
723
724 #[test]
725 fn run_node_with_hook_emits_started_then_completed() {
726 let executor: RecordingExecutor = RecordingExecutor::default();
727 let hook: RecordingHook = RecordingHook::default();
728
729 block_on(run_node_with_hook(
730 &executor,
731 context(),
732 PortsIn::default(),
733 PortsOut::default(),
734 &hook,
735 ))
736 .expect("execution should succeed");
737
738 assert_eq!(
739 hook.recorded(),
740 vec![
741 LifecycleEventKind::NodeStarted,
742 LifecycleEventKind::NodeCompleted,
743 ]
744 );
745 }
746
747 #[test]
748 fn run_node_with_hook_emits_started_then_failed_and_preserves_executor_error() {
749 let executor: FailingExecutor = FailingExecutor::execution("boom");
750 let hook: RecordingHook = RecordingHook::default();
751
752 let err: PureflowError = block_on(run_node_with_hook(
753 &executor,
754 context(),
755 PortsIn::default(),
756 PortsOut::default(),
757 &hook,
758 ))
759 .expect_err("execution should fail");
760
761 assert_eq!(
762 hook.recorded(),
763 vec![
764 LifecycleEventKind::NodeStarted,
765 LifecycleEventKind::NodeFailed
766 ]
767 );
768 assert_eq!(err, PureflowError::execution("boom"));
769 }
770
771 #[test]
772 fn run_node_provides_noop_default_hook() {
773 let executor: RecordingExecutor = RecordingExecutor::default();
774
775 block_on(run_node(
776 &executor,
777 context(),
778 PortsIn::default(),
779 PortsOut::default(),
780 ))
781 .expect("execution should succeed");
782 }
783
784 #[test]
785 fn run_node_with_hook_propagates_hook_failures() {
786 let executor: RecordingExecutor = RecordingExecutor::default();
787 let err: PureflowError = block_on(run_node_with_hook(
788 &executor,
789 context(),
790 PortsIn::default(),
791 PortsOut::default(),
792 &FailingHook,
793 ))
794 .expect_err("hook failure should surface");
795
796 assert_eq!(err, PureflowError::from(LifecycleError::new("hook failed")));
797 }
798
799 #[test]
800 fn run_node_with_metadata_sink_records_lifecycle_events() {
801 let executor: RecordingExecutor = RecordingExecutor::default();
802 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
803
804 block_on(run_node_with_metadata_sink(
805 &executor,
806 context(),
807 PortsIn::default(),
808 PortsOut::default(),
809 sink.clone(),
810 ))
811 .expect("execution should succeed");
812
813 assert_eq!(
814 sink.recorded(),
815 vec![
816 LifecycleEventKind::NodeStarted,
817 LifecycleEventKind::NodeCompleted,
818 ]
819 );
820 }
821
822 #[test]
823 fn run_node_with_metadata_sink_records_cancelled_lifecycle_for_cancellation_errors() {
824 let executor: CancelledExecutor = CancelledExecutor;
825 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
826
827 let err: PureflowError = block_on(run_node_with_metadata_sink(
828 &executor,
829 context(),
830 PortsIn::default(),
831 PortsOut::default(),
832 sink.clone(),
833 ))
834 .expect_err("cancelled execution should fail at the node boundary");
835
836 assert_eq!(err, PureflowError::cancelled("planned shutdown"));
837 assert_eq!(
838 sink.recorded(),
839 vec![
840 LifecycleEventKind::NodeStarted,
841 LifecycleEventKind::NodeCancelled,
842 ]
843 );
844 }
845
846 #[test]
847 fn run_node_with_metadata_sink_records_node_error_metadata() {
848 let executor: FailingExecutor = FailingExecutor::execution("boom");
849 let sink: Arc<RecordingAllMetadataSink> = Arc::new(RecordingAllMetadataSink::default());
850
851 let err: PureflowError = block_on(run_node_with_metadata_sink(
852 &executor,
853 context(),
854 PortsIn::default(),
855 PortsOut::default(),
856 sink.clone(),
857 ))
858 .expect_err("execution should fail");
859 let records: Vec<MetadataRecord> = sink.records();
860 let error_record = records
861 .iter()
862 .find_map(|record: &MetadataRecord| match record {
863 MetadataRecord::Error(error) => Some(error),
864 _ => None,
865 })
866 .expect("node error metadata should be recorded");
867
868 assert_eq!(err, PureflowError::execution("boom"));
869 assert_eq!(error_record.kind(), ErrorMetadataKind::NodeFailed);
870 assert_eq!(
871 error_record
872 .node_id()
873 .expect("node error should include node id")
874 .as_str(),
875 "node"
876 );
877 assert_eq!(error_record.error(), &PureflowError::execution("boom"));
878 }
879
880 #[cfg(feature = "tracing")]
881 #[test]
882 fn tracing_feature_uses_stable_runtime_labels() {
883 let event: LifecycleEvent = LifecycleEvent::new(LifecycleEventKind::NodeStarted, context());
884 let lifecycle_record: MetadataRecord = MetadataRecord::Lifecycle(event);
885 let error_record: MetadataRecord = MetadataRecord::Error(ErrorMetadataRecord::node_failed(
886 &context(),
887 PureflowError::execution("boom"),
888 ));
889
890 assert_eq!(
891 lifecycle_event_kind_label(LifecycleEventKind::NodeStarted),
892 "node_started"
893 );
894 assert_eq!(metadata_record_kind_label(&lifecycle_record), "lifecycle");
895 assert_eq!(metadata_record_kind_label(&error_record), "error");
896 }
897
898 #[test]
899 fn asupersync_runtime_can_collect_metadata() {
900 let runtime: AsupersyncRuntime = deterministic_runtime();
901 let executor: RecordingExecutor = RecordingExecutor::default();
902 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
903
904 runtime
905 .run_node_with_metadata_sink(
906 &executor,
907 context(),
908 PortsIn::default(),
909 PortsOut::default(),
910 sink.clone(),
911 )
912 .expect("execution should succeed");
913
914 assert_eq!(
915 sink.recorded(),
916 vec![
917 LifecycleEventKind::NodeStarted,
918 LifecycleEventKind::NodeCompleted,
919 ]
920 );
921 }
922
923 #[test]
924 fn run_node_with_metadata_sink_propagates_start_collection_failures() {
925 let executor: RecordingExecutor = RecordingExecutor::default();
926
927 let err: PureflowError = block_on(run_node_with_metadata_sink(
928 &executor,
929 context(),
930 PortsIn::default(),
931 PortsOut::default(),
932 Arc::new(FailingMetadataSink),
933 ))
934 .expect_err("metadata failure should surface");
935
936 assert_eq!(
937 err,
938 PureflowError::from(MetadataError::new("metadata sink failed"))
939 );
940 assert!(executor.visited_contexts().is_empty());
941 }
942
943 #[test]
944 fn executor_failure_takes_precedence_over_terminal_metadata_failure() {
945 let executor: FailingExecutor = FailingExecutor::execution("boom");
946
947 let err: PureflowError = block_on(run_node_with_metadata_sink(
948 &executor,
949 context(),
950 PortsIn::default(),
951 PortsOut::default(),
952 Arc::new(FailingOnNodeFailedMetadataSink),
953 ))
954 .expect_err("executor failure should surface");
955
956 assert_eq!(err, PureflowError::execution("boom"));
957 }
958}