Skip to main content

pureflow_runtime/
lib.rs

1//! Runtime mechanics such as supervision and backpressure primitives.
2//!
3//! ## Fragment: runtime-asupersync-bootstrap
4//!
5//! `pureflow-runtime` now owns a real runtime substrate, but only at the level
6//! the current node interface can honestly support. The wrapper below uses
7//! `asupersync::runtime::RuntimeBuilder` to provide a task-tree-backed entry
8//! point for one node execution at a time. It deliberately does not claim
9//! workflow scheduling, channel wiring, or full FBP semantics yet.
10//!
11//! ## Fragment: runtime-asupersync-boundary
12//!
13//! `asupersync` is the runtime substrate, not the public FBP model. Pureflow
14//! owns graph validation, node contracts, port handles, metadata, capability
15//! descriptors, and introspection. Runtime adapters may use `asupersync`
16//! contexts, bounded channels, cancellation, and task handles internally, but
17//! those types should not leak through `NodeExecutor`, `NodeContext`,
18//! `PortsIn`, or `PortsOut` unless a later bead explicitly revisits the
19//! boundary.
20//!
21//! ## Fragment: runtime-test-determinism
22//!
23//! Production construction intentionally keeps the default `asupersync`
24//! builder. Tests that assert ordering, cancellation, or failure propagation
25//! should use the deterministic current-thread constructor so those checks are
26//! about Pureflow behavior rather than host scheduler timing.
27
28use asupersync::runtime::{Runtime, RuntimeBuilder};
29use pureflow_core::{
30    CancellationError, CancellationHandle, PureflowError, NodeExecutor, PortsIn, PortsOut, Result,
31    context::{CancellationState, NodeContext},
32    lifecycle::{LifecycleEvent, LifecycleEventKind, LifecycleHook, NoopLifecycleHook},
33    metadata::{ErrorMetadataRecord, MetadataRecord, MetadataSink, NoopMetadataSink},
34};
35use std::future::Future;
36use std::sync::Arc;
37
38/// Narrow runtime wrapper backed by `asupersync`.
39pub struct AsupersyncRuntime {
40    runtime: Runtime,
41}
42
43impl AsupersyncRuntime {
44    /// Build the current `asupersync`-backed runtime wrapper.
45    ///
46    /// # Errors
47    ///
48    /// Returns an error if the underlying runtime cannot be constructed.
49    pub fn new() -> Result<Self> {
50        Self::from_builder(RuntimeBuilder::new())
51    }
52
53    fn from_builder(builder: RuntimeBuilder) -> Result<Self> {
54        let runtime: Runtime = match builder.build() {
55            Ok(runtime) => runtime,
56            Err(err) => {
57                return Err(PureflowError::execution(format!(
58                    "failed to build asupersync runtime: {err}"
59                )));
60            }
61        };
62
63        Ok(Self { runtime })
64    }
65
66    /// Run a future on the owned runtime.
67    pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
68        self.runtime.block_on(future)
69    }
70
71    #[cfg(test)]
72    fn deterministic_for_tests() -> Result<Self> {
73        Self::from_builder(RuntimeBuilder::current_thread().poll_budget(1))
74    }
75
76    /// Create a Pureflow-owned cancellation handle for runtime-managed contexts.
77    #[must_use]
78    pub fn cancellation_handle() -> CancellationHandle {
79        CancellationHandle::new()
80    }
81
82    /// Execute one node on the owned runtime.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if cancellation was already requested, lifecycle
87    /// observation fails, or the node executor reports one.
88    pub fn run_node<E: NodeExecutor + ?Sized>(
89        &self,
90        node: &E,
91        ctx: NodeContext,
92        inputs: PortsIn,
93        outputs: PortsOut,
94    ) -> Result<()> {
95        if let Some(err) = cancellation_error(&ctx) {
96            return Err(err);
97        }
98
99        self.runtime.block_on(run_node(node, ctx, inputs, outputs))
100    }
101
102    /// Execute one node with an externally drivable cancellation handle.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if cancellation was already requested or the node
107    /// executor reports one.
108    pub fn run_node_with_cancellation_handle<E: NodeExecutor + ?Sized>(
109        &self,
110        node: &E,
111        ctx: NodeContext,
112        inputs: PortsIn,
113        outputs: PortsOut,
114        cancellation: &CancellationHandle,
115    ) -> Result<()> {
116        let ctx: NodeContext = ctx.with_cancellation_token(cancellation.token());
117        if let Some(err) = cancellation_error(&ctx) {
118            return Err(err);
119        }
120
121        self.runtime.block_on(run_node(node, ctx, inputs, outputs))
122    }
123
124    /// Execute one node on the owned runtime and collect metadata.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if cancellation was already requested, metadata
129    /// collection fails, lifecycle observation fails, or the node executor
130    /// reports one.
131    pub fn run_node_with_metadata_sink<E, M>(
132        &self,
133        node: &E,
134        ctx: NodeContext,
135        inputs: PortsIn,
136        outputs: PortsOut,
137        metadata_sink: Arc<M>,
138    ) -> Result<()>
139    where
140        E: NodeExecutor + ?Sized,
141        M: MetadataSink + 'static,
142    {
143        if let Some(err) = cancellation_error(&ctx) {
144            return Err(err);
145        }
146
147        self.runtime.block_on(run_node_with_metadata_sink(
148            node,
149            ctx,
150            inputs,
151            outputs,
152            metadata_sink,
153        ))
154    }
155}
156
157/// Execute a single node through the runtime boundary.
158///
159/// # Errors
160///
161/// Returns an error if lifecycle observation fails or the node executor
162/// reports one.
163pub async fn run_node<E: NodeExecutor + ?Sized>(
164    node: &E,
165    ctx: NodeContext,
166    inputs: PortsIn,
167    outputs: PortsOut,
168) -> Result<()> {
169    run_node_with_observers(
170        node,
171        ctx,
172        inputs,
173        outputs,
174        &NoopLifecycleHook,
175        Arc::new(NoopMetadataSink),
176    )
177    .await
178}
179
180/// Execute a single node through the runtime boundary and report lifecycle events.
181///
182/// # Errors
183///
184/// Returns an error if lifecycle observation fails or the node executor
185/// reports one.
186pub async fn run_node_with_hook<E, H>(
187    node: &E,
188    ctx: NodeContext,
189    inputs: PortsIn,
190    outputs: PortsOut,
191    hook: &H,
192) -> Result<()>
193where
194    E: NodeExecutor + ?Sized,
195    H: LifecycleHook + ?Sized,
196{
197    run_node_with_observers(node, ctx, inputs, outputs, hook, Arc::new(NoopMetadataSink)).await
198}
199
200/// Execute a single node through the runtime boundary and collect metadata.
201///
202/// # Errors
203///
204/// Returns an error if metadata collection fails or the node executor reports
205/// one.
206pub async fn run_node_with_metadata_sink<E, M>(
207    node: &E,
208    ctx: NodeContext,
209    inputs: PortsIn,
210    outputs: PortsOut,
211    metadata_sink: Arc<M>,
212) -> Result<()>
213where
214    E: NodeExecutor + ?Sized,
215    M: MetadataSink + 'static,
216{
217    run_node_with_observers(
218        node,
219        ctx,
220        inputs,
221        outputs,
222        &NoopLifecycleHook,
223        metadata_sink,
224    )
225    .await
226}
227
228/// Execute a node and report both lifecycle and metadata observations.
229///
230/// # Errors
231///
232/// Returns an error if start observation fails, terminal observation fails
233/// after successful execution, or the node executor reports one.
234pub async fn run_node_with_observers<E, H, M>(
235    node: &E,
236    ctx: NodeContext,
237    inputs: PortsIn,
238    outputs: PortsOut,
239    hook: &H,
240    metadata_sink: Arc<M>,
241) -> Result<()>
242where
243    E: NodeExecutor + ?Sized,
244    H: LifecycleHook + ?Sized,
245    M: MetadataSink + 'static,
246{
247    let metadata_sink: Arc<dyn MetadataSink + Send + Sync> = metadata_sink.clone();
248    let inputs: PortsIn = inputs
249        .with_metadata_sink(metadata_sink.clone())
250        .with_node_context(ctx.clone());
251    let outputs: PortsOut = outputs
252        .with_metadata_sink(metadata_sink.clone())
253        .with_node_context(ctx.clone());
254    observe_lifecycle(
255        hook,
256        metadata_sink.as_ref(),
257        LifecycleEventKind::NodeStarted,
258        ctx.clone(),
259    )?;
260
261    let result: Result<()> = node.run(ctx.clone(), inputs, outputs).await;
262    let terminal_observation: Result<()> = match &result {
263        Ok(()) => observe_lifecycle(
264            hook,
265            metadata_sink.as_ref(),
266            LifecycleEventKind::NodeCompleted,
267            ctx,
268        ),
269        Err(err) => {
270            let error_observation: Result<()> =
271                observe_node_error(metadata_sink.as_ref(), &ctx, err.clone());
272            let lifecycle_kind: LifecycleEventKind = if matches!(err, PureflowError::Cancellation(_))
273            {
274                LifecycleEventKind::NodeCancelled
275            } else {
276                LifecycleEventKind::NodeFailed
277            };
278            let lifecycle_observation: Result<()> =
279                observe_lifecycle(hook, metadata_sink.as_ref(), lifecycle_kind, ctx);
280            error_observation.and(lifecycle_observation)
281        }
282    };
283
284    match (result, terminal_observation) {
285        (Ok(()), Ok(())) => Ok(()),
286        (Ok(()), Err(err)) | (Err(err), _) => Err(err),
287    }
288}
289
290fn observe_node_error<M>(metadata_sink: &M, ctx: &NodeContext, err: PureflowError) -> Result<()>
291where
292    M: MetadataSink + ?Sized,
293{
294    let record: MetadataRecord = MetadataRecord::Error(ErrorMetadataRecord::node_failed(ctx, err));
295    emit_metadata_trace(&record);
296    metadata_sink.record(&record)
297}
298
299fn observe_lifecycle<H, M>(
300    hook: &H,
301    metadata_sink: &M,
302    kind: LifecycleEventKind,
303    ctx: NodeContext,
304) -> Result<()>
305where
306    H: LifecycleHook + ?Sized,
307    M: MetadataSink + ?Sized,
308{
309    let event: LifecycleEvent = LifecycleEvent::new(kind, ctx);
310    emit_lifecycle_trace(&event);
311    let record: MetadataRecord = MetadataRecord::Lifecycle(event.clone());
312    emit_metadata_trace(&record);
313    metadata_sink.record(&record)?;
314    hook.observe(&event)
315}
316
317fn cancellation_error(ctx: &NodeContext) -> Option<PureflowError> {
318    match ctx.cancellation() {
319        CancellationState::Active => None,
320        CancellationState::Requested(request) => {
321            emit_cancellation_trace(ctx, request.reason());
322            Some(PureflowError::from(CancellationError::new(request.reason())))
323        }
324    }
325}
326
327#[cfg(feature = "tracing")]
328fn emit_lifecycle_trace(event: &LifecycleEvent) {
329    let ctx: &NodeContext = event.context();
330    tracing::info!(
331        target: "pureflow.runtime.lifecycle",
332        kind = lifecycle_event_kind_label(event.kind()),
333        workflow_id = %ctx.workflow_id(),
334        node_id = %ctx.node_id(),
335        execution_id = %ctx.execution().execution_id(),
336        attempt = ctx.execution().attempt().get(),
337        "runtime lifecycle event"
338    );
339}
340
341#[cfg(not(feature = "tracing"))]
342const fn emit_lifecycle_trace(_event: &LifecycleEvent) {}
343
344#[cfg(feature = "tracing")]
345fn emit_metadata_trace(record: &MetadataRecord) {
346    tracing::debug!(
347        target: "pureflow.runtime.metadata",
348        record_type = metadata_record_kind_label(record),
349        "runtime metadata record emitted"
350    );
351}
352
353#[cfg(not(feature = "tracing"))]
354const fn emit_metadata_trace(_record: &MetadataRecord) {}
355
356#[cfg(feature = "tracing")]
357fn emit_cancellation_trace(ctx: &NodeContext, reason: &str) {
358    tracing::warn!(
359        target: "pureflow.runtime.cancellation",
360        workflow_id = %ctx.workflow_id(),
361        node_id = %ctx.node_id(),
362        execution_id = %ctx.execution().execution_id(),
363        attempt = ctx.execution().attempt().get(),
364        reason,
365        "runtime cancellation observed"
366    );
367}
368
369#[cfg(not(feature = "tracing"))]
370const fn emit_cancellation_trace(_ctx: &NodeContext, _reason: &str) {}
371
372#[cfg(feature = "tracing")]
373const fn lifecycle_event_kind_label(kind: LifecycleEventKind) -> &'static str {
374    match kind {
375        LifecycleEventKind::NodeScheduled => "node_scheduled",
376        LifecycleEventKind::NodeStarted => "node_started",
377        LifecycleEventKind::NodeCompleted => "node_completed",
378        LifecycleEventKind::NodeFailed => "node_failed",
379        LifecycleEventKind::NodeCancelled => "node_cancelled",
380    }
381}
382
383#[cfg(feature = "tracing")]
384const fn metadata_record_kind_label(record: &MetadataRecord) -> &'static str {
385    match record {
386        MetadataRecord::ExecutionContext(_) => "execution_context",
387        MetadataRecord::Lifecycle(_) => "lifecycle",
388        MetadataRecord::Message(_) => "message",
389        MetadataRecord::QueuePressure(_) => "queue_pressure",
390        MetadataRecord::Error(_) => "error",
391        MetadataRecord::ExternalEffect(_) => "external_effect",
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398    use std::future::{Future, Ready, ready};
399    use std::pin::Pin;
400    use std::sync::Mutex;
401
402    use pureflow_core::{
403        CancellationError, PureflowError, ErrorMetadataKind, LifecycleError, MetadataError,
404        context::CancellationRequest, lifecycle::LifecycleEventKind,
405    };
406    use pureflow_test_kit::{
407        FailingExecutor, RecordingExecutor, execution_metadata, node_id, workflow_id,
408    };
409    use futures::executor::block_on;
410
411    #[derive(Debug, Default)]
412    struct RecordingHook {
413        events: Mutex<Vec<LifecycleEventKind>>,
414    }
415
416    impl RecordingHook {
417        fn recorded(&self) -> Vec<LifecycleEventKind> {
418            self.events
419                .lock()
420                .expect("recording hook lock should not be poisoned")
421                .clone()
422        }
423    }
424
425    impl LifecycleHook for RecordingHook {
426        fn observe(&self, event: &LifecycleEvent) -> Result<()> {
427            self.events
428                .lock()
429                .expect("recording hook lock should not be poisoned")
430                .push(event.kind());
431            Ok(())
432        }
433    }
434
435    #[derive(Debug)]
436    struct FailingHook;
437
438    impl LifecycleHook for FailingHook {
439        fn observe(&self, _event: &LifecycleEvent) -> Result<()> {
440            Err(PureflowError::from(LifecycleError::new("hook failed")))
441        }
442    }
443
444    #[derive(Debug)]
445    struct CancelledExecutor;
446
447    impl NodeExecutor for CancelledExecutor {
448        type RunFuture<'a> = Ready<Result<()>>;
449
450        fn run(
451            &self,
452            _ctx: NodeContext,
453            _inputs: PortsIn,
454            _outputs: PortsOut,
455        ) -> Self::RunFuture<'_> {
456            ready(Err(PureflowError::cancelled("planned shutdown")))
457        }
458    }
459
460    #[derive(Debug, Default)]
461    struct RecordingMetadataSink {
462        events: Mutex<Vec<LifecycleEventKind>>,
463    }
464
465    impl RecordingMetadataSink {
466        fn recorded(&self) -> Vec<LifecycleEventKind> {
467            self.events
468                .lock()
469                .expect("metadata sink lock should not be poisoned")
470                .clone()
471        }
472    }
473
474    impl MetadataSink for RecordingMetadataSink {
475        fn record(&self, record: &MetadataRecord) -> Result<()> {
476            if let MetadataRecord::Lifecycle(event) = record {
477                self.events
478                    .lock()
479                    .expect("metadata sink lock should not be poisoned")
480                    .push(event.kind());
481            }
482            Ok(())
483        }
484    }
485
486    #[derive(Debug, Default)]
487    struct RecordingAllMetadataSink {
488        records: Mutex<Vec<MetadataRecord>>,
489    }
490
491    impl RecordingAllMetadataSink {
492        fn records(&self) -> Vec<MetadataRecord> {
493            self.records
494                .lock()
495                .expect("metadata sink lock should not be poisoned")
496                .clone()
497        }
498    }
499
500    impl MetadataSink for RecordingAllMetadataSink {
501        fn record(&self, record: &MetadataRecord) -> Result<()> {
502            self.records
503                .lock()
504                .expect("metadata sink lock should not be poisoned")
505                .push(record.clone());
506            Ok(())
507        }
508    }
509
510    #[derive(Debug)]
511    struct FailingMetadataSink;
512
513    impl MetadataSink for FailingMetadataSink {
514        fn record(&self, _record: &MetadataRecord) -> Result<()> {
515            Err(PureflowError::from(MetadataError::new(
516                "metadata sink failed",
517            )))
518        }
519    }
520
521    #[derive(Debug)]
522    struct FailingOnNodeFailedMetadataSink;
523
524    impl MetadataSink for FailingOnNodeFailedMetadataSink {
525        fn record(&self, record: &MetadataRecord) -> Result<()> {
526            if matches!(
527                record,
528                MetadataRecord::Lifecycle(event)
529                    if event.kind() == LifecycleEventKind::NodeFailed
530            ) {
531                return Err(PureflowError::from(MetadataError::new(
532                    "terminal metadata failed",
533                )));
534            }
535            Ok(())
536        }
537    }
538
539    #[derive(Debug)]
540    struct CancellingExecutor {
541        handle: CancellationHandle,
542        observations: Mutex<Vec<bool>>,
543    }
544
545    impl CancellingExecutor {
546        fn new(handle: CancellationHandle) -> Self {
547            Self {
548                handle,
549                observations: Mutex::new(Vec::new()),
550            }
551        }
552
553        fn record(&self, value: bool) {
554            self.observations
555                .lock()
556                .expect("cancelling executor observations lock should not be poisoned")
557                .push(value);
558        }
559
560        fn observations(&self) -> Vec<bool> {
561            self.observations
562                .lock()
563                .expect("cancelling executor observations lock should not be poisoned")
564                .clone()
565        }
566    }
567
568    impl NodeExecutor for CancellingExecutor {
569        type RunFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
570
571        fn run(
572            &self,
573            ctx: NodeContext,
574            _inputs: PortsIn,
575            _outputs: PortsOut,
576        ) -> Self::RunFuture<'_> {
577            Box::pin(async move {
578                self.record(ctx.is_cancelled());
579                let _first_request: bool = self
580                    .handle
581                    .cancel(CancellationRequest::new("runtime supervisor stopped node"));
582                self.record(ctx.is_cancelled());
583                Ok(())
584            })
585        }
586    }
587
588    fn context() -> NodeContext {
589        NodeContext::new(
590            workflow_id("flow"),
591            node_id("node"),
592            execution_metadata("run-1"),
593        )
594    }
595
596    fn deterministic_runtime() -> AsupersyncRuntime {
597        AsupersyncRuntime::deterministic_for_tests().expect("deterministic runtime should build")
598    }
599
600    #[test]
601    fn deterministic_runtime_for_tests_uses_current_thread_config() {
602        let runtime: AsupersyncRuntime = deterministic_runtime();
603
604        assert_eq!(runtime.runtime.config().worker_threads, 1);
605        assert_eq!(runtime.runtime.config().poll_budget, 1);
606    }
607
608    #[test]
609    fn production_runtime_builder_defaults_remain_separate_from_test_runtime() {
610        let production: AsupersyncRuntime =
611            AsupersyncRuntime::new().expect("production runtime should build");
612        let deterministic: AsupersyncRuntime = deterministic_runtime();
613
614        assert_eq!(production.runtime.config().poll_budget, 128);
615        assert_eq!(deterministic.runtime.config().worker_threads, 1);
616        assert_eq!(deterministic.runtime.config().poll_budget, 1);
617    }
618
619    #[test]
620    fn asupersync_runtime_runs_one_node() {
621        let runtime: AsupersyncRuntime = deterministic_runtime();
622        let executor: RecordingExecutor = RecordingExecutor::default();
623
624        runtime
625            .run_node(
626                &executor,
627                context(),
628                PortsIn::default(),
629                PortsOut::default(),
630            )
631            .expect("execution should succeed");
632
633        assert_eq!(executor.visited_node_names(), vec!["node"]);
634    }
635
636    #[test]
637    fn asupersync_runtime_preserves_executor_failures() {
638        let runtime: AsupersyncRuntime = deterministic_runtime();
639        let executor: FailingExecutor = FailingExecutor::execution("boom");
640
641        let err: PureflowError = runtime
642            .run_node(
643                &executor,
644                context(),
645                PortsIn::default(),
646                PortsOut::default(),
647            )
648            .expect_err("execution should fail");
649
650        assert_eq!(err, PureflowError::execution("boom"));
651    }
652
653    #[test]
654    fn asupersync_runtime_rejects_pre_cancelled_contexts() {
655        let runtime: AsupersyncRuntime = deterministic_runtime();
656        let executor: RecordingExecutor = RecordingExecutor::default();
657        let ctx: NodeContext =
658            context().with_cancellation(CancellationRequest::new("shutdown requested"));
659
660        let err: PureflowError = runtime
661            .run_node(&executor, ctx, PortsIn::default(), PortsOut::default())
662            .expect_err("cancelled execution should not run");
663
664        assert_eq!(
665            err,
666            PureflowError::from(CancellationError::new("shutdown requested"))
667        );
668        assert!(executor.visited_contexts().is_empty());
669    }
670
671    #[test]
672    fn asupersync_runtime_cancellation_handle_is_visible_inside_running_node() {
673        let runtime: AsupersyncRuntime = deterministic_runtime();
674        let handle: CancellationHandle = AsupersyncRuntime::cancellation_handle();
675        let executor: CancellingExecutor = CancellingExecutor::new(handle.clone());
676
677        runtime
678            .run_node_with_cancellation_handle(
679                &executor,
680                context(),
681                PortsIn::default(),
682                PortsOut::default(),
683                &handle,
684            )
685            .expect("execution should succeed");
686
687        assert_eq!(executor.observations(), vec![false, true]);
688        assert!(handle.is_cancelled());
689    }
690
691    #[test]
692    fn asupersync_runtime_rejects_child_context_after_shared_cancellation() {
693        let runtime: AsupersyncRuntime = deterministic_runtime();
694        let handle: CancellationHandle = AsupersyncRuntime::cancellation_handle();
695        let canceller: CancellingExecutor = CancellingExecutor::new(handle.clone());
696        let child: RecordingExecutor = RecordingExecutor::default();
697
698        runtime
699            .run_node_with_cancellation_handle(
700                &canceller,
701                context(),
702                PortsIn::default(),
703                PortsOut::default(),
704                &handle,
705            )
706            .expect("first execution should request cancellation");
707        let err: PureflowError = runtime
708            .run_node_with_cancellation_handle(
709                &child,
710                context(),
711                PortsIn::default(),
712                PortsOut::default(),
713                &handle,
714            )
715            .expect_err("shared cancellation should reject child execution");
716
717        assert_eq!(
718            err,
719            PureflowError::from(CancellationError::new("runtime supervisor stopped node"))
720        );
721        assert!(child.visited_contexts().is_empty());
722    }
723
724    #[test]
725    fn run_node_with_hook_emits_started_then_completed() {
726        let executor: RecordingExecutor = RecordingExecutor::default();
727        let hook: RecordingHook = RecordingHook::default();
728
729        block_on(run_node_with_hook(
730            &executor,
731            context(),
732            PortsIn::default(),
733            PortsOut::default(),
734            &hook,
735        ))
736        .expect("execution should succeed");
737
738        assert_eq!(
739            hook.recorded(),
740            vec![
741                LifecycleEventKind::NodeStarted,
742                LifecycleEventKind::NodeCompleted,
743            ]
744        );
745    }
746
747    #[test]
748    fn run_node_with_hook_emits_started_then_failed_and_preserves_executor_error() {
749        let executor: FailingExecutor = FailingExecutor::execution("boom");
750        let hook: RecordingHook = RecordingHook::default();
751
752        let err: PureflowError = block_on(run_node_with_hook(
753            &executor,
754            context(),
755            PortsIn::default(),
756            PortsOut::default(),
757            &hook,
758        ))
759        .expect_err("execution should fail");
760
761        assert_eq!(
762            hook.recorded(),
763            vec![
764                LifecycleEventKind::NodeStarted,
765                LifecycleEventKind::NodeFailed
766            ]
767        );
768        assert_eq!(err, PureflowError::execution("boom"));
769    }
770
771    #[test]
772    fn run_node_provides_noop_default_hook() {
773        let executor: RecordingExecutor = RecordingExecutor::default();
774
775        block_on(run_node(
776            &executor,
777            context(),
778            PortsIn::default(),
779            PortsOut::default(),
780        ))
781        .expect("execution should succeed");
782    }
783
784    #[test]
785    fn run_node_with_hook_propagates_hook_failures() {
786        let executor: RecordingExecutor = RecordingExecutor::default();
787        let err: PureflowError = block_on(run_node_with_hook(
788            &executor,
789            context(),
790            PortsIn::default(),
791            PortsOut::default(),
792            &FailingHook,
793        ))
794        .expect_err("hook failure should surface");
795
796        assert_eq!(err, PureflowError::from(LifecycleError::new("hook failed")));
797    }
798
799    #[test]
800    fn run_node_with_metadata_sink_records_lifecycle_events() {
801        let executor: RecordingExecutor = RecordingExecutor::default();
802        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
803
804        block_on(run_node_with_metadata_sink(
805            &executor,
806            context(),
807            PortsIn::default(),
808            PortsOut::default(),
809            sink.clone(),
810        ))
811        .expect("execution should succeed");
812
813        assert_eq!(
814            sink.recorded(),
815            vec![
816                LifecycleEventKind::NodeStarted,
817                LifecycleEventKind::NodeCompleted,
818            ]
819        );
820    }
821
822    #[test]
823    fn run_node_with_metadata_sink_records_cancelled_lifecycle_for_cancellation_errors() {
824        let executor: CancelledExecutor = CancelledExecutor;
825        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
826
827        let err: PureflowError = block_on(run_node_with_metadata_sink(
828            &executor,
829            context(),
830            PortsIn::default(),
831            PortsOut::default(),
832            sink.clone(),
833        ))
834        .expect_err("cancelled execution should fail at the node boundary");
835
836        assert_eq!(err, PureflowError::cancelled("planned shutdown"));
837        assert_eq!(
838            sink.recorded(),
839            vec![
840                LifecycleEventKind::NodeStarted,
841                LifecycleEventKind::NodeCancelled,
842            ]
843        );
844    }
845
846    #[test]
847    fn run_node_with_metadata_sink_records_node_error_metadata() {
848        let executor: FailingExecutor = FailingExecutor::execution("boom");
849        let sink: Arc<RecordingAllMetadataSink> = Arc::new(RecordingAllMetadataSink::default());
850
851        let err: PureflowError = block_on(run_node_with_metadata_sink(
852            &executor,
853            context(),
854            PortsIn::default(),
855            PortsOut::default(),
856            sink.clone(),
857        ))
858        .expect_err("execution should fail");
859        let records: Vec<MetadataRecord> = sink.records();
860        let error_record = records
861            .iter()
862            .find_map(|record: &MetadataRecord| match record {
863                MetadataRecord::Error(error) => Some(error),
864                _ => None,
865            })
866            .expect("node error metadata should be recorded");
867
868        assert_eq!(err, PureflowError::execution("boom"));
869        assert_eq!(error_record.kind(), ErrorMetadataKind::NodeFailed);
870        assert_eq!(
871            error_record
872                .node_id()
873                .expect("node error should include node id")
874                .as_str(),
875            "node"
876        );
877        assert_eq!(error_record.error(), &PureflowError::execution("boom"));
878    }
879
880    #[cfg(feature = "tracing")]
881    #[test]
882    fn tracing_feature_uses_stable_runtime_labels() {
883        let event: LifecycleEvent = LifecycleEvent::new(LifecycleEventKind::NodeStarted, context());
884        let lifecycle_record: MetadataRecord = MetadataRecord::Lifecycle(event);
885        let error_record: MetadataRecord = MetadataRecord::Error(ErrorMetadataRecord::node_failed(
886            &context(),
887            PureflowError::execution("boom"),
888        ));
889
890        assert_eq!(
891            lifecycle_event_kind_label(LifecycleEventKind::NodeStarted),
892            "node_started"
893        );
894        assert_eq!(metadata_record_kind_label(&lifecycle_record), "lifecycle");
895        assert_eq!(metadata_record_kind_label(&error_record), "error");
896    }
897
898    #[test]
899    fn asupersync_runtime_can_collect_metadata() {
900        let runtime: AsupersyncRuntime = deterministic_runtime();
901        let executor: RecordingExecutor = RecordingExecutor::default();
902        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
903
904        runtime
905            .run_node_with_metadata_sink(
906                &executor,
907                context(),
908                PortsIn::default(),
909                PortsOut::default(),
910                sink.clone(),
911            )
912            .expect("execution should succeed");
913
914        assert_eq!(
915            sink.recorded(),
916            vec![
917                LifecycleEventKind::NodeStarted,
918                LifecycleEventKind::NodeCompleted,
919            ]
920        );
921    }
922
923    #[test]
924    fn run_node_with_metadata_sink_propagates_start_collection_failures() {
925        let executor: RecordingExecutor = RecordingExecutor::default();
926
927        let err: PureflowError = block_on(run_node_with_metadata_sink(
928            &executor,
929            context(),
930            PortsIn::default(),
931            PortsOut::default(),
932            Arc::new(FailingMetadataSink),
933        ))
934        .expect_err("metadata failure should surface");
935
936        assert_eq!(
937            err,
938            PureflowError::from(MetadataError::new("metadata sink failed"))
939        );
940        assert!(executor.visited_contexts().is_empty());
941    }
942
943    #[test]
944    fn executor_failure_takes_precedence_over_terminal_metadata_failure() {
945        let executor: FailingExecutor = FailingExecutor::execution("boom");
946
947        let err: PureflowError = block_on(run_node_with_metadata_sink(
948            &executor,
949            context(),
950            PortsIn::default(),
951            PortsOut::default(),
952            Arc::new(FailingOnNodeFailedMetadataSink),
953        ))
954        .expect_err("executor failure should surface");
955
956        assert_eq!(err, PureflowError::execution("boom"));
957    }
958}