Skip to main content

pureflow_engine/
lib.rs

1//! High-level workflow orchestration for Pureflow.
2
3use std::num::NonZeroUsize;
4use std::time::Duration;
5use std::{collections::BTreeMap, fmt, future::Future, sync::Arc};
6
7use pureflow_contract::{NodeContract, PortContract, SchemaRef};
8use pureflow_core::{
9    BatchExecutor, BatchInputs, BatchOutputs, CancellationHandle, PureflowError, InputPortHandle,
10    MetadataRecord, MetadataSink, NodeExecutor, OutputPacketValidator, OutputPortHandle,
11    PortPacket, PortSendError, PortsIn, PortsOut, Result, bounded_edge_channel,
12    context::{CancellationRequest, CancellationToken, ExecutionMetadata, NodeContext},
13    lifecycle::{LifecycleHook, NoopLifecycleHook},
14    message::MessageEndpoint,
15    metadata::{
16        DeadlockDiagnosticMetadata, ErrorDiagnosticMetadata, ErrorMetadataRecord, NoopMetadataSink,
17    },
18};
19use pureflow_runtime::run_node_with_observers;
20use pureflow_types::{NodeId, PortId, WorkflowId};
21use pureflow_workflow::{
22    NodeDefinition, PortDirection, WorkflowDefinition, WorkflowValidationError,
23};
24use futures::{
25    channel::oneshot,
26    future::{BoxFuture, Either, select},
27    stream::{FuturesUnordered, Next, StreamExt},
28};
29
30const DEFAULT_EDGE_CAPACITY: NonZeroUsize = NonZeroUsize::MIN;
31
32/// Runtime policy for executing one workflow graph.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub struct WorkflowRunPolicy {
35    cycle_policy: CycleRunPolicy,
36    watchdog_policy: WorkflowWatchdogPolicy,
37}
38
39impl WorkflowRunPolicy {
40    /// Reject cyclic workflows at run time.
41    #[must_use]
42    pub const fn acyclic() -> Self {
43        Self {
44            cycle_policy: CycleRunPolicy::Reject,
45            watchdog_policy: WorkflowWatchdogPolicy::Disabled,
46        }
47    }
48
49    /// Allow cycle-enabled workflow graphs to run as feedback loops.
50    #[must_use]
51    pub const fn feedback_loops(feedback_loop: FeedbackLoopRunPolicy) -> Self {
52        Self {
53            cycle_policy: CycleRunPolicy::AllowFeedbackLoops(feedback_loop),
54            watchdog_policy: WorkflowWatchdogPolicy::Disabled,
55        }
56    }
57
58    /// Configured cycle behavior.
59    #[must_use]
60    pub const fn cycle_policy(&self) -> CycleRunPolicy {
61        self.cycle_policy
62    }
63
64    /// Return a copy of this policy with watchdog behavior attached.
65    #[must_use]
66    pub const fn with_watchdog(mut self, watchdog_policy: WorkflowWatchdogPolicy) -> Self {
67        self.watchdog_policy = watchdog_policy;
68        self
69    }
70
71    /// Configured no-progress watchdog behavior.
72    #[must_use]
73    pub const fn watchdog_policy(&self) -> WorkflowWatchdogPolicy {
74        self.watchdog_policy
75    }
76}
77
78impl Default for WorkflowRunPolicy {
79    fn default() -> Self {
80        Self::acyclic()
81    }
82}
83
84/// Runtime behavior for cyclic workflow graphs.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum CycleRunPolicy {
87    /// Reject directed cycles before starting nodes.
88    Reject,
89    /// Start a cycle-enabled graph as an explicit feedback loop.
90    AllowFeedbackLoops(FeedbackLoopRunPolicy),
91}
92
93/// Startup and termination behavior for feedback-loop workflows.
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub struct FeedbackLoopRunPolicy {
96    startup: FeedbackLoopStartup,
97    termination: FeedbackLoopTermination,
98}
99
100impl FeedbackLoopRunPolicy {
101    /// Create an explicit feedback-loop policy.
102    #[must_use]
103    pub const fn new(startup: FeedbackLoopStartup, termination: FeedbackLoopTermination) -> Self {
104        Self {
105            startup,
106            termination,
107        }
108    }
109
110    /// Start every node immediately and finish when all node tasks finish.
111    #[must_use]
112    pub const fn start_all_nodes_until_complete() -> Self {
113        Self::new(
114            FeedbackLoopStartup::StartAllNodes,
115            FeedbackLoopTermination::AllNodesComplete,
116        )
117    }
118
119    /// Configured feedback-loop startup behavior.
120    #[must_use]
121    pub const fn startup(&self) -> FeedbackLoopStartup {
122        self.startup
123    }
124
125    /// Configured feedback-loop termination behavior.
126    #[must_use]
127    pub const fn termination(&self) -> FeedbackLoopTermination {
128        self.termination
129    }
130}
131
132/// How a feedback-loop graph is started.
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum FeedbackLoopStartup {
135    /// Start every declared node immediately.
136    StartAllNodes,
137}
138
139/// How a feedback-loop graph reaches a terminal state.
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141pub enum FeedbackLoopTermination {
142    /// The run completes only after all node tasks complete successfully.
143    AllNodesComplete,
144}
145
146/// Runtime watchdog behavior for workflow execution.
147#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
148pub enum WorkflowWatchdogPolicy {
149    /// Do not monitor no-progress workflow execution.
150    #[default]
151    Disabled,
152    /// Cancel the workflow if no scheduled node reaches a terminal state before
153    /// the configured deadline. The deadline resets after each node result.
154    Deadlock(DeadlockWatchdogPolicy),
155}
156
157impl WorkflowWatchdogPolicy {
158    /// Return a disabled watchdog policy.
159    #[must_use]
160    pub const fn disabled() -> Self {
161        Self::Disabled
162    }
163
164    /// Cancel the workflow after a no-progress interval.
165    ///
166    /// A zero duration is allowed and fires on the next watchdog poll, which is
167    /// useful for deterministic tests.
168    #[must_use]
169    pub const fn deadlock_after(no_progress_timeout: Duration) -> Self {
170        Self::Deadlock(DeadlockWatchdogPolicy::new(no_progress_timeout))
171    }
172}
173
174/// Configuration for no-progress deadlock detection.
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub struct DeadlockWatchdogPolicy {
177    no_progress_timeout: Duration,
178}
179
180impl DeadlockWatchdogPolicy {
181    /// Create a no-progress watchdog policy.
182    ///
183    /// A zero duration is allowed and fires on the next watchdog poll, which is
184    /// useful for deterministic tests.
185    #[must_use]
186    pub const fn new(no_progress_timeout: Duration) -> Self {
187        Self {
188            no_progress_timeout,
189        }
190    }
191
192    /// Maximum interval allowed without any node reaching a terminal state.
193    #[must_use]
194    pub const fn no_progress_timeout(&self) -> Duration {
195        self.no_progress_timeout
196    }
197}
198
199/// Diagnostic state reported when the workflow watchdog detects no progress.
200#[derive(Debug, Clone, PartialEq, Eq)]
201pub struct WorkflowDeadlockDiagnostic {
202    workflow_id: WorkflowId,
203    scheduled_node_count: usize,
204    pending_node_count: usize,
205    completed_node_count: usize,
206    failed_node_count: usize,
207    cancelled_node_count: usize,
208    bounded_edge_count: usize,
209    no_progress_timeout: Duration,
210    cycle_policy: CycleRunPolicy,
211}
212
213impl WorkflowDeadlockDiagnostic {
214    fn from_run(
215        workflow: &WorkflowDefinition,
216        summary: &WorkflowRunSummary,
217        policy: WorkflowRunPolicy,
218        watchdog: DeadlockWatchdogPolicy,
219    ) -> Self {
220        Self {
221            workflow_id: workflow.id().clone(),
222            scheduled_node_count: summary.scheduled_node_count(),
223            pending_node_count: summary.pending_node_count(),
224            completed_node_count: summary.completed_node_count(),
225            failed_node_count: summary.failed_node_count(),
226            cancelled_node_count: summary.cancelled_node_count(),
227            bounded_edge_count: workflow.edges().len(),
228            no_progress_timeout: watchdog.no_progress_timeout(),
229            cycle_policy: policy.cycle_policy(),
230        }
231    }
232
233    /// Workflow that stopped making progress.
234    #[must_use]
235    pub const fn workflow_id(&self) -> &WorkflowId {
236        &self.workflow_id
237    }
238
239    /// Nodes scheduled for this run.
240    #[must_use]
241    pub const fn scheduled_node_count(&self) -> usize {
242        self.scheduled_node_count
243    }
244
245    /// Nodes still pending when the watchdog fired.
246    #[must_use]
247    pub const fn pending_node_count(&self) -> usize {
248        self.pending_node_count
249    }
250
251    /// Nodes completed before the watchdog fired.
252    #[must_use]
253    pub const fn completed_node_count(&self) -> usize {
254        self.completed_node_count
255    }
256
257    /// Nodes failed before the watchdog fired.
258    #[must_use]
259    pub const fn failed_node_count(&self) -> usize {
260        self.failed_node_count
261    }
262
263    /// Nodes cancelled before the watchdog fired.
264    #[must_use]
265    pub const fn cancelled_node_count(&self) -> usize {
266        self.cancelled_node_count
267    }
268
269    /// Bounded graph edges in the workflow.
270    #[must_use]
271    pub const fn bounded_edge_count(&self) -> usize {
272        self.bounded_edge_count
273    }
274
275    /// No-progress interval that elapsed before the watchdog fired.
276    #[must_use]
277    pub const fn no_progress_timeout(&self) -> Duration {
278        self.no_progress_timeout
279    }
280
281    /// Cycle policy active when the watchdog fired.
282    #[must_use]
283    pub const fn cycle_policy(&self) -> CycleRunPolicy {
284        self.cycle_policy
285    }
286
287    fn to_metadata_diagnostic(&self) -> ErrorDiagnosticMetadata {
288        let metadata: DeadlockDiagnosticMetadata = DeadlockDiagnosticMetadata::new(
289            self.scheduled_node_count,
290            self.pending_node_count,
291            self.bounded_edge_count,
292            duration_millis_u64(self.no_progress_timeout),
293            cycle_run_policy_label(self.cycle_policy),
294        )
295        .with_terminal_counts(
296            self.completed_node_count,
297            self.failed_node_count,
298            self.cancelled_node_count,
299        );
300
301        match self.cycle_policy {
302            CycleRunPolicy::Reject => ErrorDiagnosticMetadata::workflow_deadlock(metadata),
303            CycleRunPolicy::AllowFeedbackLoops(feedback_loop) => {
304                ErrorDiagnosticMetadata::workflow_deadlock(metadata.with_feedback_loop(
305                    feedback_loop_startup_label(feedback_loop.startup()),
306                    feedback_loop_termination_label(feedback_loop.termination()),
307                ))
308            }
309        }
310    }
311}
312
313impl fmt::Display for WorkflowDeadlockDiagnostic {
314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315        write!(
316            f,
317            "workflow `{}` watchdog detected no workflow progress for {:?}: scheduled_nodes={}, pending_nodes={}, completed_nodes={}, failed_nodes={}, cancelled_nodes={}, bounded_edges={}, cycle_policy={:?}",
318            self.workflow_id,
319            self.no_progress_timeout,
320            self.scheduled_node_count,
321            self.pending_node_count,
322            self.completed_node_count,
323            self.failed_node_count,
324            self.cancelled_node_count,
325            self.bounded_edge_count,
326            self.cycle_policy
327        )
328    }
329}
330
331fn duration_millis_u64(duration: Duration) -> u64 {
332    u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
333}
334
335const fn cycle_run_policy_label(policy: CycleRunPolicy) -> &'static str {
336    match policy {
337        CycleRunPolicy::Reject => "reject",
338        CycleRunPolicy::AllowFeedbackLoops(_feedback_loop) => "allow_feedback_loops",
339    }
340}
341
342const fn feedback_loop_startup_label(startup: FeedbackLoopStartup) -> &'static str {
343    match startup {
344        FeedbackLoopStartup::StartAllNodes => "start_all_nodes",
345    }
346}
347
348const fn feedback_loop_termination_label(termination: FeedbackLoopTermination) -> &'static str {
349    match termination {
350        FeedbackLoopTermination::AllNodesComplete => "all_nodes_complete",
351    }
352}
353
354/// Terminal state for one workflow run.
355#[derive(Debug, Clone, Copy, PartialEq, Eq)]
356pub enum WorkflowTerminalState {
357    /// Every scheduled node completed successfully.
358    Completed,
359    /// At least one node failed with an execution, lifecycle, metadata, or
360    /// validation error.
361    Failed,
362    /// The run terminated because cancellation was the first observed error.
363    Cancelled,
364}
365
366/// Aggregate outcome for one workflow run.
367#[derive(Debug, Clone, PartialEq, Eq)]
368pub struct WorkflowRunSummary {
369    terminal_state: WorkflowTerminalState,
370    scheduled_node_count: usize,
371    completed_node_count: usize,
372    failed_node_count: usize,
373    cancelled_node_count: usize,
374    observed_message_count: usize,
375    error_count: usize,
376    first_error: Option<PureflowError>,
377    deadlock_diagnostic: Option<WorkflowDeadlockDiagnostic>,
378}
379
380impl WorkflowRunSummary {
381    /// Start an empty summary for a run with `scheduled_node_count` nodes.
382    #[must_use]
383    pub const fn new(scheduled_node_count: usize) -> Self {
384        Self {
385            terminal_state: WorkflowTerminalState::Completed,
386            scheduled_node_count,
387            completed_node_count: 0,
388            failed_node_count: 0,
389            cancelled_node_count: 0,
390            observed_message_count: 0,
391            error_count: 0,
392            first_error: None,
393            deadlock_diagnostic: None,
394        }
395    }
396
397    /// Terminal state after all scheduled node runs were observed.
398    #[must_use]
399    pub const fn terminal_state(&self) -> WorkflowTerminalState {
400        self.terminal_state
401    }
402
403    /// Number of nodes scheduled for execution.
404    #[must_use]
405    pub const fn scheduled_node_count(&self) -> usize {
406        self.scheduled_node_count
407    }
408
409    /// Number of nodes that completed successfully.
410    #[must_use]
411    pub const fn completed_node_count(&self) -> usize {
412        self.completed_node_count
413    }
414
415    /// Number of nodes that returned a non-cancellation error.
416    #[must_use]
417    pub const fn failed_node_count(&self) -> usize {
418        self.failed_node_count
419    }
420
421    /// Number of nodes that returned a cancellation error.
422    #[must_use]
423    pub const fn cancelled_node_count(&self) -> usize {
424        self.cancelled_node_count
425    }
426
427    /// Number of message observations accounted for by the workflow runner.
428    ///
429    /// This remains zero until queue-pressure/message accounting is attached to
430    /// the runner in the observability tranche.
431    #[must_use]
432    pub const fn observed_message_count(&self) -> usize {
433        self.observed_message_count
434    }
435
436    /// Number of node results that ended in an error.
437    #[must_use]
438    pub const fn error_count(&self) -> usize {
439        self.error_count
440    }
441
442    /// Number of scheduled nodes that have not yet reached a terminal state.
443    #[must_use]
444    pub const fn pending_node_count(&self) -> usize {
445        self.scheduled_node_count.saturating_sub(
446            self.completed_node_count
447                .saturating_add(self.failed_node_count)
448                .saturating_add(self.cancelled_node_count),
449        )
450    }
451
452    /// First error observed by the workflow runner, if any.
453    #[must_use]
454    pub const fn first_error(&self) -> Option<&PureflowError> {
455        self.first_error.as_ref()
456    }
457
458    /// Deadlock diagnostic captured by the workflow watchdog, if it fired.
459    #[must_use]
460    pub const fn deadlock_diagnostic(&self) -> Option<&WorkflowDeadlockDiagnostic> {
461        self.deadlock_diagnostic.as_ref()
462    }
463
464    /// Convert a summary into the legacy `Result<()>` shape.
465    ///
466    /// # Errors
467    ///
468    /// Returns the first observed workflow error when the terminal state is not
469    /// [`WorkflowTerminalState::Completed`].
470    pub fn into_result(self) -> Result<()> {
471        self.first_error.map_or(Ok(()), Err)
472    }
473
474    const fn record_success(&mut self) {
475        self.completed_node_count = self.completed_node_count.saturating_add(1);
476    }
477
478    fn record_error(&mut self, err: PureflowError) {
479        self.error_count = self.error_count.saturating_add(1);
480
481        if matches!(err, PureflowError::Cancellation(_)) {
482            self.cancelled_node_count = self.cancelled_node_count.saturating_add(1);
483        } else {
484            self.failed_node_count = self.failed_node_count.saturating_add(1);
485        }
486
487        if self.first_error.is_none() {
488            self.terminal_state = if matches!(err, PureflowError::Cancellation(_)) {
489                WorkflowTerminalState::Cancelled
490            } else {
491                WorkflowTerminalState::Failed
492            };
493            self.first_error = Some(err);
494        }
495    }
496
497    fn record_workflow_error(&mut self, err: PureflowError) {
498        self.error_count = self.error_count.saturating_add(1);
499
500        if self.first_error.is_none() {
501            self.terminal_state = if matches!(err, PureflowError::Cancellation(_)) {
502                WorkflowTerminalState::Cancelled
503            } else {
504                WorkflowTerminalState::Failed
505            };
506            self.first_error = Some(err);
507        }
508    }
509
510    fn record_deadlock_diagnostic(&mut self, diagnostic: WorkflowDeadlockDiagnostic) {
511        self.deadlock_diagnostic = Some(diagnostic);
512    }
513}
514
515/// Registry that resolves workflow nodes to runtime executors.
516///
517/// The registry owns node-to-executor selection while the workflow runner owns
518/// graph wiring, cancellation, lifecycle, and metadata behavior.
519pub trait NodeExecutorRegistry: Sync {
520    /// Concrete executor type returned for nodes in this registry.
521    type Executor: NodeExecutor + ?Sized;
522
523    /// Resolve an executor for one workflow node.
524    ///
525    /// # Errors
526    ///
527    /// Returns an error when no executor is registered for the node.
528    fn executor_for(&self, node_id: &NodeId) -> Result<&Self::Executor>;
529}
530
531/// Registry adapter that runs every workflow node through the same executor.
532#[derive(Debug, Clone, Copy)]
533pub struct SingleNodeExecutorRegistry<'a, E: ?Sized> {
534    executor: &'a E,
535}
536
537impl<'a, E: ?Sized> SingleNodeExecutorRegistry<'a, E> {
538    /// Create a registry that resolves every node to `executor`.
539    #[must_use]
540    pub const fn new(executor: &'a E) -> Self {
541        Self { executor }
542    }
543}
544
545impl<E> NodeExecutorRegistry for SingleNodeExecutorRegistry<'_, E>
546where
547    E: NodeExecutor + ?Sized,
548{
549    type Executor = E;
550
551    fn executor_for(&self, _node_id: &NodeId) -> Result<&Self::Executor> {
552        Ok(self.executor)
553    }
554}
555
556/// In-memory registry keyed by workflow node identifier.
557#[derive(Debug, Clone)]
558pub struct StaticNodeExecutorRegistry<E> {
559    executors: BTreeMap<NodeId, E>,
560}
561
562impl<E> StaticNodeExecutorRegistry<E> {
563    /// Create a static registry from a node-to-executor map.
564    #[must_use]
565    pub const fn new(executors: BTreeMap<NodeId, E>) -> Self {
566        Self { executors }
567    }
568
569    /// Return the registered executor map.
570    #[must_use]
571    pub const fn executors(&self) -> &BTreeMap<NodeId, E> {
572        &self.executors
573    }
574
575    /// Insert or replace the executor for one node.
576    pub fn insert(&mut self, node_id: NodeId, executor: E) -> Option<E> {
577        self.executors.insert(node_id, executor)
578    }
579}
580
581impl<E> NodeExecutorRegistry for StaticNodeExecutorRegistry<E>
582where
583    E: NodeExecutor,
584{
585    type Executor = E;
586
587    fn executor_for(&self, node_id: &NodeId) -> Result<&Self::Executor> {
588        self.executors.get(node_id).ok_or_else(|| {
589            PureflowError::execution(format!(
590                "no executor registered for workflow node `{node_id}`"
591            ))
592        })
593    }
594}
595
596/// Node executor adapter for host-owned batch implementations such as WASM.
597///
598/// The adapter owns the host side of the batch boundary: it drains all
599/// currently connected input edges, invokes the topology-blind batch executor,
600/// and sends every returned packet through normal [`PortsOut`] validation
601/// before a packet can enter graph edges.
602#[derive(Debug)]
603pub struct BatchNodeExecutor<E> {
604    executor: E,
605}
606
607impl<E> BatchNodeExecutor<E> {
608    /// Wrap one batch executor as a runtime node executor.
609    #[must_use]
610    pub const fn new(executor: E) -> Self {
611        Self { executor }
612    }
613
614    /// Borrow the wrapped batch executor.
615    #[must_use]
616    pub const fn executor(&self) -> &E {
617        &self.executor
618    }
619}
620
621impl<E> NodeExecutor for BatchNodeExecutor<E>
622where
623    E: BatchExecutor,
624{
625    type RunFuture<'a>
626        = BoxFuture<'a, Result<()>>
627    where
628        Self: 'a;
629
630    fn run(&self, ctx: NodeContext, inputs: PortsIn, outputs: PortsOut) -> Self::RunFuture<'_> {
631        Box::pin(run_batch_node_executor(
632            &self.executor,
633            ctx,
634            inputs,
635            outputs,
636        ))
637    }
638}
639
640async fn run_batch_node_executor<E>(
641    executor: &E,
642    ctx: NodeContext,
643    mut inputs: PortsIn,
644    outputs: PortsOut,
645) -> Result<()>
646where
647    E: BatchExecutor,
648{
649    let cancellation: CancellationToken = ctx.cancellation_token();
650    let mut batch_inputs: BatchInputs = BatchInputs::new();
651    while let Some((port_id, packet)) = inputs.recv_any(&cancellation).await? {
652        batch_inputs.push(port_id, packet);
653    }
654
655    let batch_outputs: BatchOutputs = executor.invoke(batch_inputs)?;
656    send_batch_outputs(&outputs, batch_outputs, &cancellation).await
657}
658
659async fn send_batch_outputs(
660    outputs: &PortsOut,
661    batch_outputs: BatchOutputs,
662    cancellation: &CancellationToken,
663) -> Result<()> {
664    for (port_id, packets) in batch_outputs.into_packets_by_port() {
665        send_batch_output_port(outputs, &port_id, packets, cancellation).await?;
666    }
667    Ok(())
668}
669
670async fn send_batch_output_port(
671    outputs: &PortsOut,
672    port_id: &PortId,
673    packets: Vec<PortPacket>,
674    cancellation: &CancellationToken,
675) -> Result<()> {
676    for packet in packets {
677        outputs.send(port_id, packet, cancellation).await?;
678    }
679    Ok(())
680}
681
682/// Output-port contract subset used by the workflow runner.
683#[derive(Debug, Clone)]
684pub struct WorkflowOutputContracts {
685    outputs_by_node: BTreeMap<NodeId, BTreeMap<PortId, Option<SchemaRef>>>,
686}
687
688impl WorkflowOutputContracts {
689    /// Build output validation contracts for a workflow.
690    ///
691    /// # Errors
692    ///
693    /// Returns an error when any workflow output port lacks a matching output
694    /// contract or a contract references an unknown workflow output.
695    pub fn from_node_contracts(
696        workflow: &WorkflowDefinition,
697        contracts: &[NodeContract],
698    ) -> Result<Self> {
699        let contract_map: BTreeMap<&NodeId, &NodeContract> = contracts
700            .iter()
701            .map(|contract: &NodeContract| (contract.id(), contract))
702            .collect();
703        let mut outputs_by_node: BTreeMap<NodeId, BTreeMap<PortId, Option<SchemaRef>>> =
704            BTreeMap::new();
705
706        for node in workflow.nodes() {
707            let Some(contract): Option<&NodeContract> = contract_map.get(node.id()).copied() else {
708                if node.output_ports().is_empty() {
709                    outputs_by_node.insert(node.id().clone(), BTreeMap::new());
710                    continue;
711                }
712
713                return Err(PureflowError::execution(format!(
714                    "no output contract supplied for workflow node `{}`",
715                    node.id()
716                )));
717            };
718            let mut output_contracts: BTreeMap<PortId, Option<SchemaRef>> = BTreeMap::new();
719
720            for port_id in node.output_ports() {
721                let port_contract: &PortContract = contract
722                    .ports()
723                    .iter()
724                    .find(|port: &&PortContract| port.port_id() == port_id)
725                    .ok_or_else(|| {
726                        PureflowError::execution(format!(
727                            "node `{}` output port `{port_id}` has no output contract",
728                            node.id()
729                        ))
730                    })?;
731                if port_contract.direction() != PortDirection::Output {
732                    return Err(PureflowError::execution(format!(
733                        "node `{}` port `{port_id}` contract is not an output contract",
734                        node.id()
735                    )));
736                }
737                output_contracts.insert(port_id.clone(), port_contract.schema().cloned());
738            }
739
740            for port_contract in contract.ports() {
741                if port_contract.direction() == PortDirection::Output
742                    && !node.output_ports().contains(port_contract.port_id())
743                {
744                    return Err(PureflowError::execution(format!(
745                        "node `{}` contract references unknown output port `{}`",
746                        node.id(),
747                        port_contract.port_id()
748                    )));
749                }
750            }
751
752            outputs_by_node.insert(node.id().clone(), output_contracts);
753        }
754
755        for contract in contracts {
756            if workflow
757                .nodes()
758                .iter()
759                .all(|node: &NodeDefinition| node.id() != contract.id())
760            {
761                return Err(PureflowError::execution(format!(
762                    "output contract references unknown workflow node `{}`",
763                    contract.id()
764                )));
765            }
766        }
767
768        Ok(Self { outputs_by_node })
769    }
770
771    fn output_contracts_for(
772        &self,
773        node_id: &NodeId,
774    ) -> Option<&BTreeMap<PortId, Option<SchemaRef>>> {
775        self.outputs_by_node.get(node_id)
776    }
777}
778
779#[derive(Debug, Clone)]
780struct ContractOutputValidator {
781    workflow_id: WorkflowId,
782    node_id: NodeId,
783    execution: ExecutionMetadata,
784    output_contracts: BTreeMap<PortId, Option<SchemaRef>>,
785}
786
787impl ContractOutputValidator {
788    const fn new(
789        workflow_id: WorkflowId,
790        node_id: NodeId,
791        execution: ExecutionMetadata,
792        output_contracts: BTreeMap<PortId, Option<SchemaRef>>,
793    ) -> Self {
794        Self {
795            workflow_id,
796            node_id,
797            execution,
798            output_contracts,
799        }
800    }
801
802    fn reject(port_id: &PortId, reason: impl Into<String>) -> PortSendError {
803        PortSendError::Rejected {
804            port_id: port_id.clone(),
805            reason: reason.into(),
806        }
807    }
808}
809
810impl OutputPacketValidator for ContractOutputValidator {
811    fn validate(
812        &self,
813        port_id: &PortId,
814        packet: &PortPacket,
815    ) -> std::result::Result<(), PortSendError> {
816        if !self.output_contracts.contains_key(port_id) {
817            return Err(Self::reject(
818                port_id,
819                format!(
820                    "node `{}` output port `{port_id}` has no output contract",
821                    self.node_id
822                ),
823            ));
824        }
825
826        if packet.metadata().workflow_id() != &self.workflow_id {
827            return Err(Self::reject(
828                port_id,
829                format!(
830                    "packet workflow `{}` does not match workflow `{}`",
831                    packet.metadata().workflow_id(),
832                    self.workflow_id
833                ),
834            ));
835        }
836
837        if packet.metadata().execution() != &self.execution {
838            return Err(Self::reject(
839                port_id,
840                format!(
841                    "packet execution `{}` does not match execution `{}`",
842                    packet.metadata().execution().execution_id(),
843                    self.execution.execution_id()
844                ),
845            ));
846        }
847
848        let Some(source): Option<&MessageEndpoint> = packet.metadata().route().source() else {
849            return Err(Self::reject(port_id, "packet route has no source endpoint"));
850        };
851        if source.node_id() != &self.node_id || source.port_id() != port_id {
852            return Err(Self::reject(
853                port_id,
854                format!(
855                    "packet source `{}:{}` does not match output `{}:{port_id}`",
856                    source.node_id(),
857                    source.port_id(),
858                    self.node_id
859                ),
860            ));
861        }
862
863        Ok(())
864    }
865}
866
867/// Execute the workflow by resolving one executor for each node from a registry.
868///
869/// # Errors
870///
871/// Returns an error if the default run policy rejects the workflow shape or
872/// executor resolution fails.
873pub async fn run_workflow_with_registry_summary<R>(
874    workflow: &WorkflowDefinition,
875    execution: &ExecutionMetadata,
876    registry: &R,
877) -> Result<WorkflowRunSummary>
878where
879    R: NodeExecutorRegistry + ?Sized,
880{
881    let lifecycle_hook: NoopLifecycleHook = NoopLifecycleHook;
882    run_workflow_with_registry_and_observers_summary(
883        workflow,
884        execution,
885        registry,
886        &lifecycle_hook,
887        Arc::new(NoopMetadataSink),
888    )
889    .await
890}
891
892/// Execute the workflow through a registry with an explicit run policy.
893///
894/// # Errors
895///
896/// Returns an error if the run policy rejects the workflow shape or executor
897/// resolution fails.
898pub async fn run_workflow_with_registry_policy_summary<R>(
899    workflow: &WorkflowDefinition,
900    execution: &ExecutionMetadata,
901    registry: &R,
902    policy: WorkflowRunPolicy,
903) -> Result<WorkflowRunSummary>
904where
905    R: NodeExecutorRegistry + ?Sized,
906{
907    let lifecycle_hook: NoopLifecycleHook = NoopLifecycleHook;
908    run_workflow_with_registry_and_observers_summary_inner(
909        workflow,
910        execution,
911        registry,
912        &lifecycle_hook,
913        Arc::new(NoopMetadataSink),
914        policy,
915        None,
916    )
917    .await
918}
919
920/// Execute the workflow by resolving one executor for each node from a registry.
921///
922/// # Errors
923///
924/// Returns an error if the default run policy rejects the workflow shape,
925/// executor resolution fails, or any node execution fails.
926pub async fn run_workflow_with_registry<R>(
927    workflow: &WorkflowDefinition,
928    execution: &ExecutionMetadata,
929    registry: &R,
930) -> Result<()>
931where
932    R: NodeExecutorRegistry + ?Sized,
933{
934    run_workflow_with_registry_summary(workflow, execution, registry)
935        .await?
936        .into_result()
937}
938
939/// Execute the workflow through a registry with an explicit run policy.
940///
941/// # Errors
942///
943/// Returns an error if the run policy rejects the workflow shape, executor
944/// resolution fails, or any node execution fails.
945pub async fn run_workflow_with_registry_policy<R>(
946    workflow: &WorkflowDefinition,
947    execution: &ExecutionMetadata,
948    registry: &R,
949    policy: WorkflowRunPolicy,
950) -> Result<()>
951where
952    R: NodeExecutorRegistry + ?Sized,
953{
954    run_workflow_with_registry_policy_summary(workflow, execution, registry, policy)
955        .await?
956        .into_result()
957}
958
959/// Execute the workflow through a registry with an explicit run policy and
960/// emit metadata records.
961///
962/// # Errors
963///
964/// Returns an error if the run policy rejects the workflow shape or executor
965/// resolution fails.
966pub async fn run_workflow_with_registry_policy_and_metadata_sink_summary<R, M>(
967    workflow: &WorkflowDefinition,
968    execution: &ExecutionMetadata,
969    registry: &R,
970    policy: WorkflowRunPolicy,
971    metadata_sink: Arc<M>,
972) -> Result<WorkflowRunSummary>
973where
974    R: NodeExecutorRegistry + ?Sized,
975    M: MetadataSink + 'static,
976{
977    let lifecycle_hook: NoopLifecycleHook = NoopLifecycleHook;
978    run_workflow_with_registry_and_observers_summary_inner(
979        workflow,
980        execution,
981        registry,
982        &lifecycle_hook,
983        metadata_sink,
984        policy,
985        None,
986    )
987    .await
988}
989
990/// Execute the workflow through a registry with an explicit run policy and
991/// emit metadata records.
992///
993/// # Errors
994///
995/// Returns an error if the run policy rejects the workflow shape, executor
996/// resolution fails, metadata collection fails, or any node execution fails.
997pub async fn run_workflow_with_registry_policy_and_metadata_sink<R, M>(
998    workflow: &WorkflowDefinition,
999    execution: &ExecutionMetadata,
1000    registry: &R,
1001    policy: WorkflowRunPolicy,
1002    metadata_sink: Arc<M>,
1003) -> Result<()>
1004where
1005    R: NodeExecutorRegistry + ?Sized,
1006    M: MetadataSink + 'static,
1007{
1008    run_workflow_with_registry_policy_and_metadata_sink_summary(
1009        workflow,
1010        execution,
1011        registry,
1012        policy,
1013        metadata_sink,
1014    )
1015    .await?
1016    .into_result()
1017}
1018
1019/// Execute the workflow through a registry and emit metadata records.
1020///
1021/// # Errors
1022///
1023/// Returns an error if the default run policy rejects the workflow shape or
1024/// executor resolution fails.
1025pub async fn run_workflow_with_registry_and_metadata_sink_summary<R, M>(
1026    workflow: &WorkflowDefinition,
1027    execution: &ExecutionMetadata,
1028    registry: &R,
1029    metadata_sink: Arc<M>,
1030) -> Result<WorkflowRunSummary>
1031where
1032    R: NodeExecutorRegistry + ?Sized,
1033    M: MetadataSink + 'static,
1034{
1035    let lifecycle_hook: NoopLifecycleHook = NoopLifecycleHook;
1036    run_workflow_with_registry_and_observers_summary(
1037        workflow,
1038        execution,
1039        registry,
1040        &lifecycle_hook,
1041        metadata_sink,
1042    )
1043    .await
1044}
1045
1046/// Execute the workflow through a registry and report observer records.
1047///
1048/// # Errors
1049///
1050/// Returns an error if the default run policy rejects the workflow shape or
1051/// executor resolution fails.
1052pub async fn run_workflow_with_registry_and_observers_summary<R, H, M>(
1053    workflow: &WorkflowDefinition,
1054    execution: &ExecutionMetadata,
1055    registry: &R,
1056    lifecycle_hook: &H,
1057    metadata_sink: Arc<M>,
1058) -> Result<WorkflowRunSummary>
1059where
1060    R: NodeExecutorRegistry + ?Sized,
1061    H: LifecycleHook + ?Sized,
1062    M: MetadataSink + 'static,
1063{
1064    run_workflow_with_registry_and_observers_summary_inner(
1065        workflow,
1066        execution,
1067        registry,
1068        lifecycle_hook,
1069        metadata_sink,
1070        WorkflowRunPolicy::default(),
1071        None,
1072    )
1073    .await
1074}
1075
1076async fn run_workflow_with_registry_and_observers_summary_inner<R, H, M>(
1077    workflow: &WorkflowDefinition,
1078    execution: &ExecutionMetadata,
1079    registry: &R,
1080    lifecycle_hook: &H,
1081    metadata_sink: Arc<M>,
1082    policy: WorkflowRunPolicy,
1083    output_contracts: Option<&WorkflowOutputContracts>,
1084) -> Result<WorkflowRunSummary>
1085where
1086    R: NodeExecutorRegistry + ?Sized,
1087    H: LifecycleHook + ?Sized,
1088    M: MetadataSink + 'static,
1089{
1090    validate_workflow_run_policy(workflow, policy)?;
1091
1092    let (mut inputs_by_node, mut outputs_by_node): PortWiring = build_port_wiring(workflow);
1093    let cancellation: CancellationHandle = CancellationHandle::new();
1094
1095    let node_runs: FuturesUnordered<_> = FuturesUnordered::new();
1096    for node in workflow.nodes() {
1097        let executor: &R::Executor = registry.executor_for(node.id())?;
1098        let node_id: NodeId = node.id().clone();
1099        let ctx: NodeContext =
1100            NodeContext::new(workflow.id().clone(), node_id.clone(), execution.clone())
1101                .with_cancellation_token(cancellation.token());
1102        let inputs: PortsIn = PortsIn::from_handles(
1103            node.input_ports().to_vec(),
1104            inputs_by_node.remove(node.id()).unwrap_or_default(),
1105        );
1106        let mut outputs: PortsOut = PortsOut::from_handles(
1107            node.output_ports().to_vec(),
1108            outputs_by_node.remove(node.id()).unwrap_or_default(),
1109        );
1110        if let Some(output_contracts) = output_contracts {
1111            let node_output_contracts: BTreeMap<PortId, Option<SchemaRef>> = output_contracts
1112                .output_contracts_for(node.id())
1113                .cloned()
1114                .ok_or_else(|| {
1115                    PureflowError::execution(format!(
1116                        "no output contracts supplied for workflow node `{}`",
1117                        node.id()
1118                    ))
1119                })?;
1120            outputs = outputs.with_output_validator(Arc::new(ContractOutputValidator::new(
1121                workflow.id().clone(),
1122                node_id.clone(),
1123                execution.clone(),
1124                node_output_contracts,
1125            )));
1126        }
1127        let metadata_sink: Arc<M> = metadata_sink.clone();
1128        node_runs.push(async move {
1129            let result: Result<()> = run_node_with_observers(
1130                executor,
1131                ctx,
1132                inputs,
1133                outputs,
1134                lifecycle_hook,
1135                metadata_sink,
1136            )
1137            .await;
1138            (node_id, result)
1139        });
1140    }
1141
1142    collect_workflow_summary(
1143        node_runs,
1144        WorkflowCollectionContext::new(
1145            &cancellation,
1146            workflow,
1147            execution,
1148            metadata_sink.as_ref(),
1149            policy,
1150        ),
1151        workflow.nodes().len(),
1152    )
1153    .await
1154}
1155
1156/// Execute the workflow through a registry with output contract validation.
1157///
1158/// # Errors
1159///
1160/// Returns an error if the default run policy rejects the workflow shape,
1161/// executor resolution fails, or output contract setup fails.
1162pub async fn run_workflow_with_registry_contracts_and_observers_summary<R, H, M>(
1163    workflow: &WorkflowDefinition,
1164    execution: &ExecutionMetadata,
1165    registry: &R,
1166    contracts: &[NodeContract],
1167    lifecycle_hook: &H,
1168    metadata_sink: Arc<M>,
1169) -> Result<WorkflowRunSummary>
1170where
1171    R: NodeExecutorRegistry + ?Sized,
1172    H: LifecycleHook + ?Sized,
1173    M: MetadataSink + 'static,
1174{
1175    let output_contracts: WorkflowOutputContracts =
1176        WorkflowOutputContracts::from_node_contracts(workflow, contracts)?;
1177    run_workflow_with_registry_and_observers_summary_inner(
1178        workflow,
1179        execution,
1180        registry,
1181        lifecycle_hook,
1182        metadata_sink,
1183        WorkflowRunPolicy::default(),
1184        Some(&output_contracts),
1185    )
1186    .await
1187}
1188
1189/// Execute the workflow through a registry with output contract validation.
1190///
1191/// # Errors
1192///
1193/// Returns an error if the default run policy rejects the workflow shape,
1194/// executor resolution fails, output contract setup fails, output validation
1195/// fails, observation fails, metadata collection fails, or node execution fails.
1196pub async fn run_workflow_with_registry_contracts_and_observers<R, H, M>(
1197    workflow: &WorkflowDefinition,
1198    execution: &ExecutionMetadata,
1199    registry: &R,
1200    contracts: &[NodeContract],
1201    lifecycle_hook: &H,
1202    metadata_sink: Arc<M>,
1203) -> Result<()>
1204where
1205    R: NodeExecutorRegistry + ?Sized,
1206    H: LifecycleHook + ?Sized,
1207    M: MetadataSink + 'static,
1208{
1209    run_workflow_with_registry_contracts_and_observers_summary(
1210        workflow,
1211        execution,
1212        registry,
1213        contracts,
1214        lifecycle_hook,
1215        metadata_sink,
1216    )
1217    .await?
1218    .into_result()
1219}
1220
1221/// Execute the workflow through a registry with output contract validation.
1222///
1223/// # Errors
1224///
1225/// Returns an error if the default run policy rejects the workflow shape,
1226/// executor resolution fails, or output contract setup fails.
1227pub async fn run_workflow_with_registry_contracts_summary<R>(
1228    workflow: &WorkflowDefinition,
1229    execution: &ExecutionMetadata,
1230    registry: &R,
1231    contracts: &[NodeContract],
1232) -> Result<WorkflowRunSummary>
1233where
1234    R: NodeExecutorRegistry + ?Sized,
1235{
1236    let lifecycle_hook: NoopLifecycleHook = NoopLifecycleHook;
1237    run_workflow_with_registry_contracts_and_observers_summary(
1238        workflow,
1239        execution,
1240        registry,
1241        contracts,
1242        &lifecycle_hook,
1243        Arc::new(NoopMetadataSink),
1244    )
1245    .await
1246}
1247
1248/// Execute the workflow through a registry with output contract validation.
1249///
1250/// # Errors
1251///
1252/// Returns an error if the default run policy rejects the workflow shape,
1253/// executor resolution fails, output contract setup fails, output validation
1254/// fails, or node execution fails.
1255pub async fn run_workflow_with_registry_contracts<R>(
1256    workflow: &WorkflowDefinition,
1257    execution: &ExecutionMetadata,
1258    registry: &R,
1259    contracts: &[NodeContract],
1260) -> Result<()>
1261where
1262    R: NodeExecutorRegistry + ?Sized,
1263{
1264    run_workflow_with_registry_contracts_summary(workflow, execution, registry, contracts)
1265        .await?
1266        .into_result()
1267}
1268
1269/// Execute the workflow through a registry and report observer records.
1270///
1271/// # Errors
1272///
1273/// Returns an error if the default run policy rejects the workflow shape,
1274/// executor resolution fails, observation fails, metadata collection fails, or
1275/// node execution fails.
1276pub async fn run_workflow_with_registry_and_observers<R, H, M>(
1277    workflow: &WorkflowDefinition,
1278    execution: &ExecutionMetadata,
1279    registry: &R,
1280    lifecycle_hook: &H,
1281    metadata_sink: Arc<M>,
1282) -> Result<()>
1283where
1284    R: NodeExecutorRegistry + ?Sized,
1285    H: LifecycleHook + ?Sized,
1286    M: MetadataSink + 'static,
1287{
1288    run_workflow_with_registry_and_observers_summary(
1289        workflow,
1290        execution,
1291        registry,
1292        lifecycle_hook,
1293        metadata_sink,
1294    )
1295    .await?
1296    .into_result()
1297}
1298
1299/// Execute the workflow through a registry and emit metadata records.
1300///
1301/// # Errors
1302///
1303/// Returns an error if the default run policy rejects the workflow shape,
1304/// executor resolution fails, metadata collection fails, or node execution
1305/// fails.
1306pub async fn run_workflow_with_registry_and_metadata_sink<R, M>(
1307    workflow: &WorkflowDefinition,
1308    execution: &ExecutionMetadata,
1309    registry: &R,
1310    metadata_sink: Arc<M>,
1311) -> Result<()>
1312where
1313    R: NodeExecutorRegistry + ?Sized,
1314    M: MetadataSink + 'static,
1315{
1316    run_workflow_with_registry_and_metadata_sink_summary(
1317        workflow,
1318        execution,
1319        registry,
1320        metadata_sink,
1321    )
1322    .await?
1323    .into_result()
1324}
1325
1326/// Execute the workflow by invoking the provided executor for each node.
1327///
1328/// # Errors
1329///
1330/// Returns an error if the default run policy rejects the workflow shape or
1331/// any node execution fails.
1332pub async fn run_workflow<E: NodeExecutor + ?Sized>(
1333    workflow: &WorkflowDefinition,
1334    execution: &ExecutionMetadata,
1335    executor: &E,
1336) -> Result<()> {
1337    let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1338    run_workflow_with_registry(workflow, execution, &registry).await
1339}
1340
1341/// Execute the workflow through one executor and return an aggregate summary.
1342///
1343/// # Errors
1344///
1345/// Returns an error if the default run policy rejects the workflow shape or
1346/// registry-style executor setup fails.
1347pub async fn run_workflow_summary<E: NodeExecutor + ?Sized>(
1348    workflow: &WorkflowDefinition,
1349    execution: &ExecutionMetadata,
1350    executor: &E,
1351) -> Result<WorkflowRunSummary> {
1352    let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1353    run_workflow_with_registry_summary(workflow, execution, &registry).await
1354}
1355
1356/// Execute the workflow through one executor with an explicit run policy.
1357///
1358/// # Errors
1359///
1360/// Returns an error if the run policy rejects the workflow shape or
1361/// registry-style executor setup fails.
1362pub async fn run_workflow_with_policy_summary<E: NodeExecutor + ?Sized>(
1363    workflow: &WorkflowDefinition,
1364    execution: &ExecutionMetadata,
1365    executor: &E,
1366    policy: WorkflowRunPolicy,
1367) -> Result<WorkflowRunSummary> {
1368    let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1369    run_workflow_with_registry_policy_summary(workflow, execution, &registry, policy).await
1370}
1371
1372/// Execute the workflow through one executor with an explicit run policy.
1373///
1374/// # Errors
1375///
1376/// Returns an error if the run policy rejects the workflow shape or any node
1377/// execution fails.
1378pub async fn run_workflow_with_policy<E: NodeExecutor + ?Sized>(
1379    workflow: &WorkflowDefinition,
1380    execution: &ExecutionMetadata,
1381    executor: &E,
1382    policy: WorkflowRunPolicy,
1383) -> Result<()> {
1384    let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1385    run_workflow_with_registry_policy(workflow, execution, &registry, policy).await
1386}
1387
1388/// Execute the workflow through one executor and report observer records.
1389///
1390/// # Errors
1391///
1392/// Returns an error if the default run policy rejects the workflow shape or
1393/// registry-style executor setup fails.
1394pub async fn run_workflow_with_observers_summary<E, H, M>(
1395    workflow: &WorkflowDefinition,
1396    execution: &ExecutionMetadata,
1397    executor: &E,
1398    lifecycle_hook: &H,
1399    metadata_sink: Arc<M>,
1400) -> Result<WorkflowRunSummary>
1401where
1402    E: NodeExecutor + ?Sized,
1403    H: LifecycleHook + ?Sized,
1404    M: MetadataSink + 'static,
1405{
1406    let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1407    run_workflow_with_registry_and_observers_summary(
1408        workflow,
1409        execution,
1410        &registry,
1411        lifecycle_hook,
1412        metadata_sink,
1413    )
1414    .await
1415}
1416
1417/// Execute the workflow through one executor and report observer records.
1418///
1419/// # Errors
1420///
1421/// Returns an error if the default run policy rejects the workflow shape,
1422/// observation fails, metadata collection fails, or node execution fails.
1423pub async fn run_workflow_with_observers<E, H, M>(
1424    workflow: &WorkflowDefinition,
1425    execution: &ExecutionMetadata,
1426    executor: &E,
1427    lifecycle_hook: &H,
1428    metadata_sink: Arc<M>,
1429) -> Result<()>
1430where
1431    E: NodeExecutor + ?Sized,
1432    H: LifecycleHook + ?Sized,
1433    M: MetadataSink + 'static,
1434{
1435    let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1436    run_workflow_with_registry_and_observers(
1437        workflow,
1438        execution,
1439        &registry,
1440        lifecycle_hook,
1441        metadata_sink,
1442    )
1443    .await
1444}
1445
1446/// Execute the workflow through one executor with output contract validation.
1447///
1448/// # Errors
1449///
1450/// Returns an error if the default run policy rejects the workflow shape or
1451/// output contract setup fails.
1452pub async fn run_workflow_with_contracts_summary<E: NodeExecutor + ?Sized>(
1453    workflow: &WorkflowDefinition,
1454    execution: &ExecutionMetadata,
1455    executor: &E,
1456    contracts: &[NodeContract],
1457) -> Result<WorkflowRunSummary> {
1458    let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1459    run_workflow_with_registry_contracts_summary(workflow, execution, &registry, contracts).await
1460}
1461
1462/// Execute the workflow through one executor with output contract validation.
1463///
1464/// # Errors
1465///
1466/// Returns an error if the default run policy rejects the workflow shape,
1467/// output contract setup fails, output validation fails, or node execution
1468/// fails.
1469pub async fn run_workflow_with_contracts<E: NodeExecutor + ?Sized>(
1470    workflow: &WorkflowDefinition,
1471    execution: &ExecutionMetadata,
1472    executor: &E,
1473    contracts: &[NodeContract],
1474) -> Result<()> {
1475    let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1476    run_workflow_with_registry_contracts(workflow, execution, &registry, contracts).await
1477}
1478
1479/// Execute the workflow with one executor and emit metadata records through a sink.
1480///
1481/// # Errors
1482///
1483/// Returns an error if the default run policy rejects the workflow shape,
1484/// metadata collection fails, or any node execution fails.
1485pub async fn run_workflow_with_metadata_sink<E, M>(
1486    workflow: &WorkflowDefinition,
1487    execution: &ExecutionMetadata,
1488    executor: &E,
1489    metadata_sink: Arc<M>,
1490) -> Result<()>
1491where
1492    E: NodeExecutor + ?Sized,
1493    M: MetadataSink + 'static,
1494{
1495    let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1496    run_workflow_with_registry_and_metadata_sink(workflow, execution, &registry, metadata_sink)
1497        .await
1498}
1499
1500/// Execute the workflow with one executor, emit metadata, and return a summary.
1501///
1502/// # Errors
1503///
1504/// Returns an error if the default run policy rejects the workflow shape or
1505/// registry-style executor setup fails.
1506pub async fn run_workflow_with_metadata_sink_summary<E, M>(
1507    workflow: &WorkflowDefinition,
1508    execution: &ExecutionMetadata,
1509    executor: &E,
1510    metadata_sink: Arc<M>,
1511) -> Result<WorkflowRunSummary>
1512where
1513    E: NodeExecutor + ?Sized,
1514    M: MetadataSink + 'static,
1515{
1516    let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1517    run_workflow_with_registry_and_metadata_sink_summary(
1518        workflow,
1519        execution,
1520        &registry,
1521        metadata_sink,
1522    )
1523    .await
1524}
1525
1526#[derive(Clone, Copy)]
1527struct WorkflowCollectionContext<'a> {
1528    cancellation: &'a CancellationHandle,
1529    workflow: &'a WorkflowDefinition,
1530    execution: &'a ExecutionMetadata,
1531    metadata_sink: &'a dyn MetadataSink,
1532    policy: WorkflowRunPolicy,
1533}
1534
1535impl<'a> WorkflowCollectionContext<'a> {
1536    const fn new(
1537        cancellation: &'a CancellationHandle,
1538        workflow: &'a WorkflowDefinition,
1539        execution: &'a ExecutionMetadata,
1540        metadata_sink: &'a dyn MetadataSink,
1541        policy: WorkflowRunPolicy,
1542    ) -> Self {
1543        Self {
1544            cancellation,
1545            workflow,
1546            execution,
1547            metadata_sink,
1548            policy,
1549        }
1550    }
1551}
1552
1553async fn collect_workflow_summary<F>(
1554    mut node_runs: FuturesUnordered<F>,
1555    context: WorkflowCollectionContext<'_>,
1556    scheduled_node_count: usize,
1557) -> Result<WorkflowRunSummary>
1558where
1559    F: Future<Output = (NodeId, Result<()>)>,
1560{
1561    let mut summary: WorkflowRunSummary = WorkflowRunSummary::new(scheduled_node_count);
1562
1563    match context.policy.watchdog_policy() {
1564        WorkflowWatchdogPolicy::Disabled => {
1565            collect_workflow_summary_until_complete(&mut node_runs, context, &mut summary).await?;
1566        }
1567        WorkflowWatchdogPolicy::Deadlock(watchdog) => {
1568            collect_workflow_summary_with_deadlock_watchdog(
1569                &mut node_runs,
1570                context,
1571                watchdog,
1572                &mut summary,
1573            )
1574            .await?;
1575        }
1576    }
1577
1578    Ok(summary)
1579}
1580
1581async fn collect_workflow_summary_until_complete<F>(
1582    node_runs: &mut FuturesUnordered<F>,
1583    context: WorkflowCollectionContext<'_>,
1584    summary: &mut WorkflowRunSummary,
1585) -> Result<()>
1586where
1587    F: Future<Output = (NodeId, Result<()>)>,
1588{
1589    while let Some((_node_id, result)) = node_runs.next().await {
1590        record_node_run_result(context, summary, result)?;
1591    }
1592
1593    Ok(())
1594}
1595
1596async fn collect_workflow_summary_with_deadlock_watchdog<F>(
1597    node_runs: &mut FuturesUnordered<F>,
1598    context: WorkflowCollectionContext<'_>,
1599    watchdog: DeadlockWatchdogPolicy,
1600    summary: &mut WorkflowRunSummary,
1601) -> Result<()>
1602where
1603    F: Future<Output = (NodeId, Result<()>)>,
1604{
1605    loop {
1606        if node_runs.is_empty() {
1607            return Ok(());
1608        }
1609
1610        let next_node_result: Next<'_, FuturesUnordered<F>> = node_runs.next();
1611        let watchdog_deadline: BoxFuture<'static, ()> =
1612            deadlock_watchdog_deadline(watchdog.no_progress_timeout())?;
1613        futures::pin_mut!(next_node_result);
1614        futures::pin_mut!(watchdog_deadline);
1615
1616        match select(next_node_result, watchdog_deadline).await {
1617            Either::Left((Some((_node_id, result)), _deadline)) => {
1618                record_node_run_result(context, summary, result)?;
1619            }
1620            Either::Left((None, _deadline)) => return Ok(()),
1621            Either::Right(((), _next_node_result)) => {
1622                let diagnostic: WorkflowDeadlockDiagnostic = WorkflowDeadlockDiagnostic::from_run(
1623                    context.workflow,
1624                    summary,
1625                    context.policy,
1626                    watchdog,
1627                );
1628                let err: PureflowError = PureflowError::execution(diagnostic.to_string());
1629                record_first_workflow_error_with_diagnostic(context, &err, &diagnostic)?;
1630                summary.record_workflow_error(err);
1631                summary.record_deadlock_diagnostic(diagnostic);
1632                return Ok(());
1633            }
1634        }
1635    }
1636}
1637
1638fn record_node_run_result(
1639    context: WorkflowCollectionContext<'_>,
1640    summary: &mut WorkflowRunSummary,
1641    result: Result<()>,
1642) -> Result<()> {
1643    match result {
1644        Ok(()) => summary.record_success(),
1645        Err(err) => {
1646            if summary.first_error().is_none() {
1647                record_first_workflow_error(context, &err)?;
1648            }
1649            summary.record_error(err);
1650        }
1651    }
1652
1653    Ok(())
1654}
1655
1656fn record_first_workflow_error(
1657    context: WorkflowCollectionContext<'_>,
1658    err: &PureflowError,
1659) -> Result<()> {
1660    let record: MetadataRecord = MetadataRecord::Error(ErrorMetadataRecord::workflow_failed(
1661        context.workflow.id().clone(),
1662        context.execution.clone(),
1663        err.clone(),
1664    ));
1665    context.metadata_sink.record(&record)?;
1666    let _first_request: bool = context
1667        .cancellation
1668        .cancel(CancellationRequest::new(format!(
1669            "node execution failed: {err}"
1670        )));
1671    Ok(())
1672}
1673
1674fn record_first_workflow_error_with_diagnostic(
1675    context: WorkflowCollectionContext<'_>,
1676    err: &PureflowError,
1677    diagnostic: &WorkflowDeadlockDiagnostic,
1678) -> Result<()> {
1679    let record: MetadataRecord =
1680        MetadataRecord::Error(ErrorMetadataRecord::workflow_failed_with_diagnostic(
1681            context.workflow.id().clone(),
1682            context.execution.clone(),
1683            err.clone(),
1684            diagnostic.to_metadata_diagnostic(),
1685        ));
1686    context.metadata_sink.record(&record)?;
1687    let _first_request: bool = context
1688        .cancellation
1689        .cancel(CancellationRequest::new(format!(
1690            "node execution failed: {err}"
1691        )));
1692    Ok(())
1693}
1694
1695fn deadlock_watchdog_deadline(timeout: Duration) -> Result<BoxFuture<'static, ()>> {
1696    let (sender, receiver): (oneshot::Sender<()>, oneshot::Receiver<()>) = oneshot::channel();
1697    std::thread::Builder::new()
1698        .name(String::from("pureflow-deadlock-watchdog"))
1699        .spawn(move || {
1700            std::thread::sleep(timeout);
1701            let _send_result: std::result::Result<(), ()> = sender.send(());
1702        })
1703        .map_err(|err: std::io::Error| {
1704            PureflowError::execution(format!("failed to start workflow deadlock watchdog: {err}"))
1705        })?;
1706
1707    Ok(Box::pin(async move {
1708        let _deadline_result: std::result::Result<(), oneshot::Canceled> = receiver.await;
1709    }))
1710}
1711
1712fn validate_workflow_run_policy(
1713    workflow: &WorkflowDefinition,
1714    policy: WorkflowRunPolicy,
1715) -> Result<()> {
1716    match workflow.graph().topological_order() {
1717        Ok(_order) => Ok(()),
1718        Err(WorkflowValidationError::CycleDetected { cycle }) => {
1719            validate_cycle_run_policy(workflow, policy, &cycle)
1720        }
1721        Err(err) => Err(PureflowError::execution(format!(
1722            "workflow `{}` topology validation failed before execution: {err}",
1723            workflow.id()
1724        ))),
1725    }
1726}
1727
1728fn validate_cycle_run_policy(
1729    workflow: &WorkflowDefinition,
1730    policy: WorkflowRunPolicy,
1731    cycle: &[NodeId],
1732) -> Result<()> {
1733    match policy.cycle_policy() {
1734        CycleRunPolicy::Reject => Err(PureflowError::execution(format!(
1735            "workflow `{}` contains directed cycle {}; use an explicit feedback-loop run policy to execute cyclic graphs",
1736            workflow.id(),
1737            cycle_label(cycle)
1738        ))),
1739        CycleRunPolicy::AllowFeedbackLoops(feedback_loop) => {
1740            match (feedback_loop.startup(), feedback_loop.termination()) {
1741                (FeedbackLoopStartup::StartAllNodes, FeedbackLoopTermination::AllNodesComplete) => {
1742                    Ok(())
1743                }
1744            }
1745        }
1746    }
1747}
1748
1749fn cycle_label(cycle: &[NodeId]) -> String {
1750    let mut label: String = String::new();
1751    for (index, node_id) in cycle.iter().enumerate() {
1752        if index > 0 {
1753            label.push_str(" -> ");
1754        }
1755        label.push_str(node_id.as_str());
1756    }
1757    label
1758}
1759
1760type PortWiring = (
1761    BTreeMap<NodeId, Vec<InputPortHandle>>,
1762    BTreeMap<NodeId, Vec<OutputPortHandle>>,
1763);
1764
1765fn build_port_wiring(workflow: &WorkflowDefinition) -> PortWiring {
1766    let mut inputs_by_node: BTreeMap<NodeId, Vec<InputPortHandle>> = BTreeMap::new();
1767    let mut outputs_by_node: BTreeMap<NodeId, Vec<OutputPortHandle>> = BTreeMap::new();
1768
1769    for edge in workflow.edges() {
1770        let capacity: NonZeroUsize = edge.capacity().resolve(DEFAULT_EDGE_CAPACITY);
1771        let (output, input): (OutputPortHandle, InputPortHandle) = bounded_edge_channel(
1772            edge.source().port_id().clone(),
1773            edge.target().port_id().clone(),
1774            capacity,
1775        );
1776        outputs_by_node
1777            .entry(edge.source().node_id().clone())
1778            .or_default()
1779            .push(output);
1780        inputs_by_node
1781            .entry(edge.target().node_id().clone())
1782            .or_default()
1783            .push(input);
1784    }
1785
1786    (inputs_by_node, outputs_by_node)
1787}
1788
1789#[cfg(test)]
1790mod tests {
1791    use super::*;
1792    use std::{
1793        collections::BTreeMap,
1794        future::{Ready, ready},
1795        sync::{Arc, Mutex},
1796        time::Duration,
1797    };
1798
1799    use pureflow_contract::{Determinism, ExecutionMode, PortContract, SchemaRef};
1800    use pureflow_core::{
1801        BatchExecutor, BatchInputs, BatchOutputs, PureflowError, ErrorCode, ErrorDiagnosticMetadata,
1802        ErrorMetadataKind, MetadataRecord, MetadataSink, PacketPayload, PortPacket, PortRecvError,
1803        PortSendError, RetryDisposition,
1804        lifecycle::{LifecycleEvent, LifecycleEventKind},
1805        message::{MessageEndpoint, MessageMetadata, MessageRoute},
1806    };
1807    use pureflow_test_kit::{
1808        FailingExecutor, NodeBuilder, RecordingExecutor, WorkflowBuilder, execution_metadata,
1809        node_id, port_id, workflow_id,
1810    };
1811    use pureflow_types::{ExecutionId, MessageId};
1812    use pureflow_workflow::EdgeDefinition;
1813    use futures::channel::oneshot;
1814    use futures::executor::block_on;
1815    use futures::future::BoxFuture;
1816
1817    #[derive(Debug, Default)]
1818    struct ChannelExecutor {
1819        received: Mutex<Vec<Vec<u8>>>,
1820    }
1821
1822    impl ChannelExecutor {
1823        fn received_payloads(&self) -> Vec<Vec<u8>> {
1824            self.received
1825                .lock()
1826                .expect("channel executor lock should not be poisoned")
1827                .clone()
1828        }
1829    }
1830
1831    impl NodeExecutor for ChannelExecutor {
1832        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
1833
1834        fn run(
1835            &self,
1836            ctx: NodeContext,
1837            mut inputs: PortsIn,
1838            outputs: PortsOut,
1839        ) -> Self::RunFuture<'_> {
1840            Box::pin(async move {
1841                if ctx.node_id().as_str() == "source" {
1842                    let cancellation = ctx.cancellation_token();
1843                    outputs
1844                        .send(&port_id("out"), packet(b"hello"), &cancellation)
1845                        .await?;
1846                    outputs
1847                        .send(&port_id("out"), packet(b"world"), &cancellation)
1848                        .await?;
1849                } else if ctx.node_id().as_str() == "sink" {
1850                    let cancellation = ctx.cancellation_token();
1851                    for _packet_index in 0..2 {
1852                        let packet: PortPacket = inputs
1853                            .recv(&port_id("in"), &cancellation)
1854                            .await?
1855                            .expect("source should have queued a packet");
1856                        self.received
1857                            .lock()
1858                            .expect("channel executor lock should not be poisoned")
1859                            .push(
1860                                packet
1861                                    .into_payload()
1862                                    .as_bytes()
1863                                    .expect("channel test sends bytes")
1864                                    .to_vec(),
1865                            );
1866                    }
1867                }
1868
1869                Ok(())
1870            })
1871        }
1872    }
1873
1874    #[derive(Debug, Clone, Copy)]
1875    enum RegistryExecutorRole {
1876        Source,
1877        Sink,
1878    }
1879
1880    #[derive(Debug, Clone)]
1881    struct RegistryExecutor {
1882        role: RegistryExecutorRole,
1883        received: Arc<Mutex<Vec<Vec<u8>>>>,
1884    }
1885
1886    impl RegistryExecutor {
1887        fn source(received: Arc<Mutex<Vec<Vec<u8>>>>) -> Self {
1888            Self {
1889                role: RegistryExecutorRole::Source,
1890                received,
1891            }
1892        }
1893
1894        fn sink(received: Arc<Mutex<Vec<Vec<u8>>>>) -> Self {
1895            Self {
1896                role: RegistryExecutorRole::Sink,
1897                received,
1898            }
1899        }
1900    }
1901
1902    impl NodeExecutor for RegistryExecutor {
1903        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
1904
1905        fn run(
1906            &self,
1907            ctx: NodeContext,
1908            mut inputs: PortsIn,
1909            outputs: PortsOut,
1910        ) -> Self::RunFuture<'_> {
1911            Box::pin(async move {
1912                let cancellation = ctx.cancellation_token();
1913                match self.role {
1914                    RegistryExecutorRole::Source => {
1915                        outputs
1916                            .send(&port_id("out"), packet(b"registered"), &cancellation)
1917                            .await?;
1918                    }
1919                    RegistryExecutorRole::Sink => {
1920                        let packet: PortPacket = inputs
1921                            .recv(&port_id("in"), &cancellation)
1922                            .await?
1923                            .expect("registered source should send one packet");
1924                        self.received
1925                            .lock()
1926                            .expect("registry executor lock should not be poisoned")
1927                            .push(packet_payload_bytes(packet));
1928                    }
1929                }
1930
1931                Ok(())
1932            })
1933        }
1934    }
1935
1936    #[derive(Debug, Default)]
1937    struct BoundedBackpressureExecutor {
1938        events: Mutex<Vec<String>>,
1939        received: Mutex<Vec<Vec<u8>>>,
1940    }
1941
1942    impl BoundedBackpressureExecutor {
1943        fn events(&self) -> Vec<String> {
1944            self.events
1945                .lock()
1946                .expect("backpressure executor events lock should not be poisoned")
1947                .clone()
1948        }
1949
1950        fn received_payloads(&self) -> Vec<Vec<u8>> {
1951            self.received
1952                .lock()
1953                .expect("backpressure executor received lock should not be poisoned")
1954                .clone()
1955        }
1956
1957        fn push_event(&self, event: &str) {
1958            self.events
1959                .lock()
1960                .expect("backpressure executor events lock should not be poisoned")
1961                .push(event.to_owned());
1962        }
1963
1964        fn push_received(&self, packet: PortPacket) {
1965            self.received
1966                .lock()
1967                .expect("backpressure executor received lock should not be poisoned")
1968                .push(packet_payload_bytes(packet));
1969        }
1970    }
1971
1972    impl NodeExecutor for BoundedBackpressureExecutor {
1973        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
1974
1975        fn run(
1976            &self,
1977            ctx: NodeContext,
1978            mut inputs: PortsIn,
1979            outputs: PortsOut,
1980        ) -> Self::RunFuture<'_> {
1981            Box::pin(async move {
1982                let cancellation = ctx.cancellation_token();
1983
1984                match ctx.node_id().as_str() {
1985                    "source" => {
1986                        outputs
1987                            .send(
1988                                &port_id("out"),
1989                                packet_between(b"first", "source", "sink"),
1990                                &cancellation,
1991                            )
1992                            .await?;
1993
1994                        let full_send: std::result::Result<(), PortSendError> = outputs.try_send(
1995                            &port_id("out"),
1996                            packet_between(b"blocked", "source", "sink"),
1997                        );
1998                        if matches!(full_send, Err(PortSendError::Full { .. })) {
1999                            self.push_event("source-observed-full-edge");
2000                        } else {
2001                            return Err(PureflowError::execution(
2002                                "bounded edge should reject a second immediate send",
2003                            ));
2004                        }
2005
2006                        outputs
2007                            .send(
2008                                &port_id("out"),
2009                                packet_between(b"second", "source", "sink"),
2010                                &cancellation,
2011                            )
2012                            .await?;
2013                        self.push_event("source-second-send-completed");
2014                    }
2015                    "sink" => {
2016                        for _packet_index in 0..2 {
2017                            let packet: PortPacket = inputs
2018                                .recv(&port_id("in"), &cancellation)
2019                                .await?
2020                                .expect("source should send two packets");
2021                            self.push_received(packet);
2022                        }
2023                    }
2024                    _ => {}
2025                }
2026
2027                Ok(())
2028            })
2029        }
2030    }
2031
2032    #[derive(Debug, Default)]
2033    struct FanOutExecutor {
2034        received_by_node: Mutex<BTreeMap<String, Vec<Vec<u8>>>>,
2035    }
2036
2037    impl FanOutExecutor {
2038        fn received_by_node(&self) -> BTreeMap<String, Vec<Vec<u8>>> {
2039            self.received_by_node
2040                .lock()
2041                .expect("fan-out executor lock should not be poisoned")
2042                .clone()
2043        }
2044
2045        fn push_received(&self, node_id: &str, packet: PortPacket) {
2046            self.received_by_node
2047                .lock()
2048                .expect("fan-out executor lock should not be poisoned")
2049                .entry(node_id.to_owned())
2050                .or_default()
2051                .push(packet_payload_bytes(packet));
2052        }
2053    }
2054
2055    impl NodeExecutor for FanOutExecutor {
2056        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2057
2058        fn run(
2059            &self,
2060            ctx: NodeContext,
2061            mut inputs: PortsIn,
2062            outputs: PortsOut,
2063        ) -> Self::RunFuture<'_> {
2064            Box::pin(async move {
2065                let cancellation = ctx.cancellation_token();
2066
2067                if ctx.node_id().as_str() == "source" {
2068                    outputs
2069                        .send(
2070                            &port_id("out"),
2071                            packet_between(b"fan", "source", "left"),
2072                            &cancellation,
2073                        )
2074                        .await?;
2075                    return Ok(());
2076                }
2077
2078                let node_name: String = ctx.node_id().to_string();
2079                let packet: PortPacket = inputs
2080                    .recv(&port_id("in"), &cancellation)
2081                    .await?
2082                    .expect("fan-out sink should receive one packet");
2083                self.push_received(&node_name, packet);
2084
2085                Ok(())
2086            })
2087        }
2088    }
2089
2090    #[derive(Debug, Default)]
2091    struct FanInClosureExecutor {
2092        received: Mutex<Vec<Vec<u8>>>,
2093        closure_observed: Mutex<bool>,
2094    }
2095
2096    impl FanInClosureExecutor {
2097        fn received_payloads(&self) -> Vec<Vec<u8>> {
2098            self.received
2099                .lock()
2100                .expect("fan-in executor received lock should not be poisoned")
2101                .clone()
2102        }
2103
2104        fn closure_observed(&self) -> bool {
2105            *self
2106                .closure_observed
2107                .lock()
2108                .expect("fan-in executor closure lock should not be poisoned")
2109        }
2110
2111        fn push_received(&self, packet: PortPacket) {
2112            self.received
2113                .lock()
2114                .expect("fan-in executor received lock should not be poisoned")
2115                .push(packet_payload_bytes(packet));
2116        }
2117    }
2118
2119    impl NodeExecutor for FanInClosureExecutor {
2120        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2121
2122        fn run(
2123            &self,
2124            ctx: NodeContext,
2125            mut inputs: PortsIn,
2126            outputs: PortsOut,
2127        ) -> Self::RunFuture<'_> {
2128            Box::pin(async move {
2129                let cancellation = ctx.cancellation_token();
2130
2131                match ctx.node_id().as_str() {
2132                    "left" | "right" => {
2133                        let source_node: String = ctx.node_id().to_string();
2134                        outputs
2135                            .send(
2136                                &port_id("out"),
2137                                packet_between(source_node.as_bytes(), &source_node, "collector"),
2138                                &cancellation,
2139                            )
2140                            .await?;
2141                    }
2142                    "collector" => {
2143                        for _packet_index in 0..2 {
2144                            let packet: PortPacket = inputs
2145                                .recv(&port_id("in"), &cancellation)
2146                                .await?
2147                                .expect("fan-in collector should receive both packets");
2148                            self.push_received(packet);
2149                        }
2150
2151                        let closed: std::result::Result<Option<PortPacket>, PortRecvError> =
2152                            inputs.recv(&port_id("in"), &cancellation).await;
2153                        if matches!(closed, Err(PortRecvError::Disconnected { .. })) {
2154                            *self
2155                                .closure_observed
2156                                .lock()
2157                                .expect("fan-in executor closure lock should not be poisoned") =
2158                                true;
2159                        } else {
2160                            return Err(PureflowError::execution(
2161                                "fan-in input should close after upstream senders finish",
2162                            ));
2163                        }
2164                    }
2165                    _ => {}
2166                }
2167
2168                Ok(())
2169            })
2170        }
2171    }
2172
2173    #[derive(Debug, Default)]
2174    struct AggregateFailureExecutor {
2175        visited: Mutex<Vec<String>>,
2176    }
2177
2178    impl AggregateFailureExecutor {
2179        fn visited_node_names(&self) -> Vec<String> {
2180            self.visited
2181                .lock()
2182                .expect("aggregate failure executor lock should not be poisoned")
2183                .clone()
2184        }
2185    }
2186
2187    impl NodeExecutor for AggregateFailureExecutor {
2188        type RunFuture<'a> = Ready<Result<()>>;
2189
2190        fn run(
2191            &self,
2192            ctx: NodeContext,
2193            _inputs: PortsIn,
2194            _outputs: PortsOut,
2195        ) -> Self::RunFuture<'_> {
2196            self.visited
2197                .lock()
2198                .expect("aggregate failure executor lock should not be poisoned")
2199                .push(ctx.node_id().to_string());
2200
2201            if ctx.node_id().as_str() == "first" {
2202                return ready(Err(PureflowError::execution("first failed")));
2203            }
2204
2205            ready(Ok(()))
2206        }
2207    }
2208
2209    #[derive(Debug, Clone, Copy)]
2210    enum FailureMatrixRole {
2211        SourceFails,
2212        SourceForTransformFailure,
2213        TransformFails,
2214        PassthroughTransform,
2215        SinkWaits,
2216    }
2217
2218    #[derive(Debug, Clone, Copy)]
2219    struct FailureMatrixExecutor {
2220        role: FailureMatrixRole,
2221    }
2222
2223    impl FailureMatrixExecutor {
2224        const fn source_fails() -> Self {
2225            Self {
2226                role: FailureMatrixRole::SourceFails,
2227            }
2228        }
2229
2230        const fn source_for_transform_failure() -> Self {
2231            Self {
2232                role: FailureMatrixRole::SourceForTransformFailure,
2233            }
2234        }
2235
2236        const fn transform_fails() -> Self {
2237            Self {
2238                role: FailureMatrixRole::TransformFails,
2239            }
2240        }
2241
2242        const fn passthrough_transform() -> Self {
2243            Self {
2244                role: FailureMatrixRole::PassthroughTransform,
2245            }
2246        }
2247
2248        const fn sink_waits() -> Self {
2249            Self {
2250                role: FailureMatrixRole::SinkWaits,
2251            }
2252        }
2253    }
2254
2255    impl NodeExecutor for FailureMatrixExecutor {
2256        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2257
2258        fn run(
2259            &self,
2260            ctx: NodeContext,
2261            mut inputs: PortsIn,
2262            outputs: PortsOut,
2263        ) -> Self::RunFuture<'_> {
2264            Box::pin(async move {
2265                let cancellation = ctx.cancellation_token();
2266
2267                match self.role {
2268                    FailureMatrixRole::SourceFails => {
2269                        Err(PureflowError::execution("matrix source failed"))
2270                    }
2271                    FailureMatrixRole::SourceForTransformFailure => outputs
2272                        .send(
2273                            &port_id("out"),
2274                            packet_between(b"source", "source", "transform"),
2275                            &cancellation,
2276                        )
2277                        .await
2278                        .map_err(PureflowError::from),
2279                    FailureMatrixRole::TransformFails => {
2280                        let _packet = inputs.recv(&port_id("in"), &cancellation).await?;
2281                        Err(PureflowError::execution("matrix transform failed"))
2282                    }
2283                    FailureMatrixRole::PassthroughTransform => {
2284                        let packet = inputs
2285                            .recv(&port_id("in"), &cancellation)
2286                            .await?
2287                            .expect("source should send transform input");
2288                        outputs
2289                            .send(&port_id("out"), packet, &cancellation)
2290                            .await
2291                            .map_err(PureflowError::from)
2292                    }
2293                    FailureMatrixRole::SinkWaits => {
2294                        let _packet = inputs.recv(&port_id("in"), &cancellation).await?;
2295                        Ok(())
2296                    }
2297                }
2298            })
2299        }
2300    }
2301
2302    #[derive(Debug)]
2303    struct DisconnectedDownstreamExecutor {
2304        role: DisconnectedDownstreamRole,
2305        signal: Arc<DisconnectedDownstreamSignal>,
2306    }
2307
2308    #[derive(Debug, Clone, Copy)]
2309    enum DisconnectedDownstreamRole {
2310        Source,
2311        Sink,
2312    }
2313
2314    #[derive(Debug)]
2315    struct DisconnectedDownstreamSignal {
2316        sender: Mutex<Option<oneshot::Sender<()>>>,
2317        receiver: Mutex<Option<oneshot::Receiver<()>>>,
2318    }
2319
2320    impl DisconnectedDownstreamExecutor {
2321        fn pair() -> (Self, Self) {
2322            let (sender, receiver) = oneshot::channel();
2323            let signal = Arc::new(DisconnectedDownstreamSignal {
2324                sender: Mutex::new(Some(sender)),
2325                receiver: Mutex::new(Some(receiver)),
2326            });
2327
2328            (
2329                Self {
2330                    role: DisconnectedDownstreamRole::Source,
2331                    signal: Arc::clone(&signal),
2332                },
2333                Self {
2334                    role: DisconnectedDownstreamRole::Sink,
2335                    signal,
2336                },
2337            )
2338        }
2339    }
2340
2341    impl NodeExecutor for DisconnectedDownstreamExecutor {
2342        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2343
2344        fn run(
2345            &self,
2346            ctx: NodeContext,
2347            _inputs: PortsIn,
2348            outputs: PortsOut,
2349        ) -> Self::RunFuture<'_> {
2350            Box::pin(async move {
2351                match self.role {
2352                    DisconnectedDownstreamRole::Source => {
2353                        let receiver = self
2354                            .signal
2355                            .receiver
2356                            .lock()
2357                            .expect("disconnect receiver lock should not be poisoned")
2358                            .take()
2359                            .expect("source should own disconnect receiver");
2360                        receiver
2361                            .await
2362                            .expect("sink should signal before source sends");
2363                        outputs
2364                            .send(
2365                                &port_id("out"),
2366                                packet_between(b"late", "source", "sink"),
2367                                &ctx.cancellation_token(),
2368                            )
2369                            .await
2370                            .map_err(PureflowError::from)
2371                    }
2372                    DisconnectedDownstreamRole::Sink => {
2373                        if let Some(sender) = self
2374                            .signal
2375                            .sender
2376                            .lock()
2377                            .expect("disconnect sender lock should not be poisoned")
2378                            .take()
2379                        {
2380                            let _send_result = sender.send(());
2381                        }
2382                        Ok(())
2383                    }
2384                }
2385            })
2386        }
2387    }
2388
2389    #[derive(Debug, Clone, Copy)]
2390    enum FanOutPartialFailureRole {
2391        Source,
2392        GoodSink,
2393        DroppingSink,
2394    }
2395
2396    #[derive(Debug)]
2397    struct FanOutPartialFailureExecutor {
2398        role: FanOutPartialFailureRole,
2399        state: Arc<FanOutPartialFailureState>,
2400    }
2401
2402    #[derive(Debug)]
2403    struct FanOutPartialFailureState {
2404        dropped_sender: Mutex<Option<oneshot::Sender<()>>>,
2405        dropped_receiver: Mutex<Option<oneshot::Receiver<()>>>,
2406        good_sink_observation: Mutex<Option<String>>,
2407    }
2408
2409    impl FanOutPartialFailureExecutor {
2410        fn registry() -> StaticNodeExecutorRegistry<Self> {
2411            let (dropped_sender, dropped_receiver) = oneshot::channel();
2412            let state = Arc::new(FanOutPartialFailureState {
2413                dropped_sender: Mutex::new(Some(dropped_sender)),
2414                dropped_receiver: Mutex::new(Some(dropped_receiver)),
2415                good_sink_observation: Mutex::new(None),
2416            });
2417
2418            StaticNodeExecutorRegistry::new(BTreeMap::from([
2419                (
2420                    node_id("source"),
2421                    Self {
2422                        role: FanOutPartialFailureRole::Source,
2423                        state: Arc::clone(&state),
2424                    },
2425                ),
2426                (
2427                    node_id("good"),
2428                    Self {
2429                        role: FanOutPartialFailureRole::GoodSink,
2430                        state: Arc::clone(&state),
2431                    },
2432                ),
2433                (
2434                    node_id("drop"),
2435                    Self {
2436                        role: FanOutPartialFailureRole::DroppingSink,
2437                        state,
2438                    },
2439                ),
2440            ]))
2441        }
2442
2443        fn good_sink_observation(&self) -> Option<String> {
2444            self.state
2445                .good_sink_observation
2446                .lock()
2447                .expect("fan-out partial observation lock should not be poisoned")
2448                .clone()
2449        }
2450    }
2451
2452    impl NodeExecutor for FanOutPartialFailureExecutor {
2453        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2454
2455        fn run(
2456            &self,
2457            ctx: NodeContext,
2458            mut inputs: PortsIn,
2459            outputs: PortsOut,
2460        ) -> Self::RunFuture<'_> {
2461            Box::pin(async move {
2462                match self.role {
2463                    FanOutPartialFailureRole::Source => {
2464                        let receiver = self
2465                            .state
2466                            .dropped_receiver
2467                            .lock()
2468                            .expect("fan-out dropped receiver lock should not be poisoned")
2469                            .take()
2470                            .expect("source should own dropped receiver");
2471                        receiver
2472                            .await
2473                            .expect("dropping sink should signal before source sends");
2474                        outputs
2475                            .send(
2476                                &port_id("out"),
2477                                packet_between(b"fan", "source", "good"),
2478                                &ctx.cancellation_token(),
2479                            )
2480                            .await
2481                            .map_err(PureflowError::from)
2482                    }
2483                    FanOutPartialFailureRole::GoodSink => {
2484                        let observation =
2485                            match inputs.recv(&port_id("in"), &ctx.cancellation_token()).await {
2486                                Ok(Some(_packet)) => "unexpected_packet",
2487                                Ok(None) => "closed_without_packet",
2488                                Err(PortRecvError::Cancelled { .. }) => "cancelled_without_packet",
2489                                Err(PortRecvError::Disconnected { .. }) => {
2490                                    "disconnected_without_packet"
2491                                }
2492                                Err(PortRecvError::UnknownPort { .. }) => "unknown_port",
2493                            };
2494                        *self
2495                            .state
2496                            .good_sink_observation
2497                            .lock()
2498                            .expect("fan-out partial observation lock should not be poisoned") =
2499                            Some(observation.to_owned());
2500                        if observation == "unexpected_packet" {
2501                            return Err(PureflowError::execution(
2502                                "fan-out partial send delivered to good sink",
2503                            ));
2504                        }
2505                        Ok(())
2506                    }
2507                    FanOutPartialFailureRole::DroppingSink => {
2508                        if let Some(sender) = self
2509                            .state
2510                            .dropped_sender
2511                            .lock()
2512                            .expect("fan-out dropped sender lock should not be poisoned")
2513                            .take()
2514                        {
2515                            let _send_result = sender.send(());
2516                        }
2517                        Ok(())
2518                    }
2519                }
2520            })
2521        }
2522    }
2523
2524    #[derive(Debug, Default)]
2525    struct CancelledExecutor;
2526
2527    impl NodeExecutor for CancelledExecutor {
2528        type RunFuture<'a> = Ready<Result<()>>;
2529
2530        fn run(
2531            &self,
2532            _ctx: NodeContext,
2533            _inputs: PortsIn,
2534            _outputs: PortsOut,
2535        ) -> Self::RunFuture<'_> {
2536            ready(Err(PureflowError::cancelled("test cancellation")))
2537        }
2538    }
2539
2540    #[derive(Debug, Default)]
2541    struct WaitingInputExecutor {
2542        visited: Mutex<Vec<String>>,
2543    }
2544
2545    impl WaitingInputExecutor {
2546        fn visited_node_names(&self) -> Vec<String> {
2547            self.visited
2548                .lock()
2549                .expect("waiting input executor lock should not be poisoned")
2550                .clone()
2551        }
2552    }
2553
2554    impl NodeExecutor for WaitingInputExecutor {
2555        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2556
2557        fn run(
2558            &self,
2559            ctx: NodeContext,
2560            mut inputs: PortsIn,
2561            outputs: PortsOut,
2562        ) -> Self::RunFuture<'_> {
2563            Box::pin(async move {
2564                let _held_outputs = outputs;
2565                self.visited
2566                    .lock()
2567                    .expect("waiting input executor lock should not be poisoned")
2568                    .push(ctx.node_id().to_string());
2569
2570                let _packet = inputs
2571                    .recv(&port_id("in"), &ctx.cancellation_token())
2572                    .await?;
2573
2574                Ok(())
2575            })
2576        }
2577    }
2578
2579    #[derive(Debug, Clone, Copy)]
2580    enum FeedbackLoopExecutorRole {
2581        Driver,
2582        Counter,
2583    }
2584
2585    #[derive(Debug, Clone)]
2586    struct FeedbackLoopExecutor {
2587        role: FeedbackLoopExecutorRole,
2588        observed: Arc<Mutex<Vec<String>>>,
2589    }
2590
2591    impl FeedbackLoopExecutor {
2592        fn driver(observed: Arc<Mutex<Vec<String>>>) -> Self {
2593            Self {
2594                role: FeedbackLoopExecutorRole::Driver,
2595                observed,
2596            }
2597        }
2598
2599        fn counter(observed: Arc<Mutex<Vec<String>>>) -> Self {
2600            Self {
2601                role: FeedbackLoopExecutorRole::Counter,
2602                observed,
2603            }
2604        }
2605
2606        fn push_observed(&self, value: String) {
2607            self.observed
2608                .lock()
2609                .expect("feedback loop executor lock should not be poisoned")
2610                .push(value);
2611        }
2612    }
2613
2614    impl NodeExecutor for FeedbackLoopExecutor {
2615        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2616
2617        fn run(
2618            &self,
2619            ctx: NodeContext,
2620            mut inputs: PortsIn,
2621            outputs: PortsOut,
2622        ) -> Self::RunFuture<'_> {
2623            Box::pin(async move {
2624                let cancellation = ctx.cancellation_token();
2625                match self.role {
2626                    FeedbackLoopExecutorRole::Driver => {
2627                        outputs
2628                            .send(
2629                                &port_id("out"),
2630                                packet_between(b"seed", "first", "second"),
2631                                &cancellation,
2632                            )
2633                            .await?;
2634                        let packet: PortPacket = inputs
2635                            .recv(&port_id("in"), &cancellation)
2636                            .await?
2637                            .expect("counter should return one packet");
2638                        self.push_observed(format!(
2639                            "driver:{}",
2640                            String::from_utf8(packet_payload_bytes(packet))
2641                                .expect("feedback loop test payload should be UTF-8")
2642                        ));
2643                    }
2644                    FeedbackLoopExecutorRole::Counter => {
2645                        let packet: PortPacket = inputs
2646                            .recv(&port_id("in"), &cancellation)
2647                            .await?
2648                            .expect("driver should seed the loop");
2649                        self.push_observed(format!(
2650                            "counter:{}",
2651                            String::from_utf8(packet_payload_bytes(packet))
2652                                .expect("feedback loop test payload should be UTF-8")
2653                        ));
2654                        outputs
2655                            .send(
2656                                &port_id("out"),
2657                                packet_between(b"ack", "second", "first"),
2658                                &cancellation,
2659                            )
2660                            .await?;
2661                    }
2662                }
2663
2664                Ok(())
2665            })
2666        }
2667    }
2668
2669    #[derive(Debug, Clone, Copy)]
2670    enum FeedbackLoopShutdownRole {
2671        Failing,
2672        ShutdownWatcher,
2673    }
2674
2675    #[derive(Debug, Clone)]
2676    struct FeedbackLoopShutdownExecutor {
2677        role: FeedbackLoopShutdownRole,
2678        cancellation_observed: Arc<Mutex<bool>>,
2679    }
2680
2681    impl FeedbackLoopShutdownExecutor {
2682        fn failing(cancellation_observed: Arc<Mutex<bool>>) -> Self {
2683            Self {
2684                role: FeedbackLoopShutdownRole::Failing,
2685                cancellation_observed,
2686            }
2687        }
2688
2689        fn shutdown_watcher(cancellation_observed: Arc<Mutex<bool>>) -> Self {
2690            Self {
2691                role: FeedbackLoopShutdownRole::ShutdownWatcher,
2692                cancellation_observed,
2693            }
2694        }
2695    }
2696
2697    impl NodeExecutor for FeedbackLoopShutdownExecutor {
2698        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2699
2700        fn run(
2701            &self,
2702            ctx: NodeContext,
2703            _inputs: PortsIn,
2704            _outputs: PortsOut,
2705        ) -> Self::RunFuture<'_> {
2706            Box::pin(async move {
2707                match self.role {
2708                    FeedbackLoopShutdownRole::Failing => {
2709                        Err(PureflowError::execution("feedback loop shutdown requested"))
2710                    }
2711                    FeedbackLoopShutdownRole::ShutdownWatcher => {
2712                        let cancellation = ctx.cancellation_token();
2713                        std::future::poll_fn(|task_cx: &mut std::task::Context<'_>| {
2714                            if cancellation.is_cancelled() {
2715                                *self
2716                                    .cancellation_observed
2717                                    .lock()
2718                                    .expect("shutdown executor lock should not be poisoned") = true;
2719                                std::task::Poll::Ready(Ok(()))
2720                            } else {
2721                                task_cx.waker().wake_by_ref();
2722                                std::task::Poll::Pending
2723                            }
2724                        })
2725                        .await
2726                    }
2727                }
2728            })
2729        }
2730    }
2731
2732    #[derive(Debug, Clone, Copy)]
2733    enum ContractOutputMode {
2734        MatchingSource,
2735        MismatchedSource,
2736    }
2737
2738    #[derive(Debug, Clone, Copy)]
2739    enum ContractBatchOutputMode {
2740        MatchingSource,
2741        MismatchedSource,
2742        UnknownPort,
2743    }
2744
2745    #[derive(Debug, Clone, Copy)]
2746    struct ContractOutputExecutor {
2747        mode: ContractOutputMode,
2748    }
2749
2750    #[derive(Debug, Clone, Copy)]
2751    struct ContractBatchExecutor {
2752        mode: ContractBatchOutputMode,
2753    }
2754
2755    impl ContractOutputExecutor {
2756        const fn matching_source() -> Self {
2757            Self {
2758                mode: ContractOutputMode::MatchingSource,
2759            }
2760        }
2761
2762        const fn mismatched_source() -> Self {
2763            Self {
2764                mode: ContractOutputMode::MismatchedSource,
2765            }
2766        }
2767    }
2768
2769    impl ContractBatchExecutor {
2770        const fn matching_source() -> Self {
2771            Self {
2772                mode: ContractBatchOutputMode::MatchingSource,
2773            }
2774        }
2775
2776        const fn mismatched_source() -> Self {
2777            Self {
2778                mode: ContractBatchOutputMode::MismatchedSource,
2779            }
2780        }
2781
2782        const fn unknown_port() -> Self {
2783            Self {
2784                mode: ContractBatchOutputMode::UnknownPort,
2785            }
2786        }
2787    }
2788
2789    impl NodeExecutor for ContractOutputExecutor {
2790        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2791
2792        fn run(
2793            &self,
2794            ctx: NodeContext,
2795            _inputs: PortsIn,
2796            outputs: PortsOut,
2797        ) -> Self::RunFuture<'_> {
2798            Box::pin(async move {
2799                let source_node: &str = match self.mode {
2800                    ContractOutputMode::MatchingSource => "source",
2801                    ContractOutputMode::MismatchedSource => "other",
2802                };
2803                outputs
2804                    .send(
2805                        &port_id("out"),
2806                        packet_between(b"contracted", source_node, "sink"),
2807                        &ctx.cancellation_token(),
2808                    )
2809                    .await?;
2810                Ok(())
2811            })
2812        }
2813    }
2814
2815    impl BatchExecutor for ContractBatchExecutor {
2816        fn invoke(&self, _inputs: BatchInputs) -> Result<BatchOutputs> {
2817            let (output_port, source_node): (&str, &str) = match self.mode {
2818                ContractBatchOutputMode::MatchingSource => ("out", "source"),
2819                ContractBatchOutputMode::MismatchedSource => ("out", "other"),
2820                ContractBatchOutputMode::UnknownPort => ("rogue", "source"),
2821            };
2822            let mut outputs: BatchOutputs = BatchOutputs::new();
2823            outputs.push(
2824                port_id(output_port),
2825                packet_between(b"contracted", source_node, "sink"),
2826            );
2827            Ok(outputs)
2828        }
2829    }
2830
2831    #[derive(Debug)]
2832    struct RecordingSinkExecutor {
2833        received: Arc<Mutex<Vec<Vec<u8>>>>,
2834    }
2835
2836    impl RecordingSinkExecutor {
2837        fn new(received: Arc<Mutex<Vec<Vec<u8>>>>) -> Self {
2838            Self { received }
2839        }
2840    }
2841
2842    impl NodeExecutor for RecordingSinkExecutor {
2843        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2844
2845        fn run(
2846            &self,
2847            ctx: NodeContext,
2848            mut inputs: PortsIn,
2849            _outputs: PortsOut,
2850        ) -> Self::RunFuture<'_> {
2851            Box::pin(async move {
2852                match inputs.recv(&port_id("in"), &ctx.cancellation_token()).await {
2853                    Ok(Some(packet)) => {
2854                        self.received
2855                            .lock()
2856                            .expect("recording sink lock should not be poisoned")
2857                            .push(packet_payload_bytes(packet));
2858                    }
2859                    Ok(None)
2860                    | Err(PortRecvError::Disconnected { .. } | PortRecvError::Cancelled { .. }) => {
2861                    }
2862                    Err(err) => return Err(err.into()),
2863                }
2864
2865                Ok(())
2866            })
2867        }
2868    }
2869
2870    #[derive(Debug)]
2871    enum ContractBatchRegistryExecutor {
2872        Batch(BatchNodeExecutor<ContractBatchExecutor>),
2873        Sink(RecordingSinkExecutor),
2874    }
2875
2876    impl NodeExecutor for ContractBatchRegistryExecutor {
2877        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2878
2879        fn run(&self, ctx: NodeContext, inputs: PortsIn, outputs: PortsOut) -> Self::RunFuture<'_> {
2880            match self {
2881                Self::Batch(executor) => executor.run(ctx, inputs, outputs),
2882                Self::Sink(executor) => executor.run(ctx, inputs, outputs),
2883            }
2884        }
2885    }
2886
2887    #[derive(Debug, Default)]
2888    struct SiblingCancellationExecutor {
2889        cancellation_observed: Mutex<bool>,
2890    }
2891
2892    impl SiblingCancellationExecutor {
2893        fn cancellation_observed(&self) -> bool {
2894            *self
2895                .cancellation_observed
2896                .lock()
2897                .expect("sibling cancellation executor lock should not be poisoned")
2898        }
2899    }
2900
2901    #[derive(Debug, Default)]
2902    struct CapacityProbeExecutor {
2903        observed: Mutex<Vec<Option<usize>>>,
2904    }
2905
2906    impl CapacityProbeExecutor {
2907        fn observed_capacities(&self) -> Vec<Option<usize>> {
2908            self.observed
2909                .lock()
2910                .expect("capacity probe executor lock should not be poisoned")
2911                .clone()
2912        }
2913    }
2914
2915    #[derive(Debug, Default)]
2916    struct RecordingMetadataSink {
2917        records: Mutex<Vec<MetadataRecord>>,
2918    }
2919
2920    impl RecordingMetadataSink {
2921        fn records(&self) -> Vec<MetadataRecord> {
2922            self.records
2923                .lock()
2924                .expect("metadata sink lock should not be poisoned")
2925                .clone()
2926        }
2927    }
2928
2929    impl MetadataSink for RecordingMetadataSink {
2930        fn record(&self, record: &MetadataRecord) -> Result<()> {
2931            self.records
2932                .lock()
2933                .expect("metadata sink lock should not be poisoned")
2934                .push(record.clone());
2935            Ok(())
2936        }
2937    }
2938
2939    #[derive(Debug, Default)]
2940    struct RecordingLifecycleHook {
2941        events: Mutex<Vec<LifecycleEventKind>>,
2942    }
2943
2944    impl RecordingLifecycleHook {
2945        fn recorded(&self) -> Vec<LifecycleEventKind> {
2946            self.events
2947                .lock()
2948                .expect("lifecycle hook lock should not be poisoned")
2949                .clone()
2950        }
2951    }
2952
2953    impl LifecycleHook for RecordingLifecycleHook {
2954        fn observe(&self, event: &LifecycleEvent) -> Result<()> {
2955            self.events
2956                .lock()
2957                .expect("lifecycle hook lock should not be poisoned")
2958                .push(event.kind());
2959            Ok(())
2960        }
2961    }
2962
2963    impl NodeExecutor for CapacityProbeExecutor {
2964        type RunFuture<'a> = Ready<Result<()>>;
2965
2966        fn run(
2967            &self,
2968            ctx: NodeContext,
2969            inputs: PortsIn,
2970            _outputs: PortsOut,
2971        ) -> Self::RunFuture<'_> {
2972            if ctx.node_id().as_str() == "probe" {
2973                let capacity = inputs.capacity(&port_id("in"));
2974                self.observed
2975                    .lock()
2976                    .expect("capacity probe executor lock should not be poisoned")
2977                    .push(capacity);
2978            }
2979
2980            ready(Ok(()))
2981        }
2982    }
2983
2984    impl NodeExecutor for SiblingCancellationExecutor {
2985        type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2986
2987        fn run(
2988            &self,
2989            ctx: NodeContext,
2990            mut inputs: PortsIn,
2991            _outputs: PortsOut,
2992        ) -> Self::RunFuture<'_> {
2993            Box::pin(async move {
2994                if ctx.node_id().as_str() == "fail" {
2995                    return Err(PureflowError::execution("fail requested"));
2996                }
2997
2998                if ctx.node_id().as_str() == "worker" {
2999                    let cancellation = ctx.cancellation_token();
3000                    let result: std::result::Result<Option<PortPacket>, PortRecvError> =
3001                        inputs.recv(&port_id("in"), &cancellation).await;
3002                    if matches!(result, Err(PortRecvError::Cancelled { .. })) {
3003                        *self
3004                            .cancellation_observed
3005                            .lock()
3006                            .expect("sibling cancellation executor lock should not be poisoned") =
3007                            true;
3008                        return Ok(());
3009                    }
3010
3011                    return Err(PureflowError::execution(
3012                        "worker input should be cancelled after sibling failure",
3013                    ));
3014                }
3015
3016                Ok(())
3017            })
3018        }
3019    }
3020
3021    fn execution_id(value: &str) -> ExecutionId {
3022        ExecutionId::new(value).expect("valid execution id")
3023    }
3024
3025    fn message_id(value: &str) -> MessageId {
3026        MessageId::new(value).expect("valid message id")
3027    }
3028
3029    fn packet(value: &[u8]) -> PortPacket {
3030        packet_between(value, "source", "sink")
3031    }
3032
3033    fn packet_between(value: &[u8], source_node: &str, target_node: &str) -> PortPacket {
3034        let source: MessageEndpoint = MessageEndpoint::new(node_id(source_node), port_id("out"));
3035        let target: MessageEndpoint = MessageEndpoint::new(node_id(target_node), port_id("in"));
3036        let route: MessageRoute = MessageRoute::new(Some(source), target);
3037        let execution: ExecutionMetadata = ExecutionMetadata::first_attempt(execution_id("run-1"));
3038        let metadata: MessageMetadata =
3039            MessageMetadata::new(message_id("msg-1"), workflow_id("flow"), execution, route);
3040
3041        PortPacket::new(metadata, PacketPayload::from(value.to_vec()))
3042    }
3043
3044    fn schema(value: &str) -> SchemaRef {
3045        SchemaRef::new(value).expect("valid schema ref")
3046    }
3047
3048    fn source_output_contracts() -> Vec<NodeContract> {
3049        vec![
3050            NodeContract::new(
3051                node_id("source"),
3052                vec![PortContract::new(
3053                    port_id("out"),
3054                    PortDirection::Output,
3055                    Some(schema("schema://packet")),
3056                )],
3057                ExecutionMode::Native,
3058                Determinism::Unknown,
3059                RetryDisposition::Unknown,
3060            )
3061            .expect("valid source contract"),
3062        ]
3063    }
3064
3065    fn cyclic_workflow() -> WorkflowDefinition {
3066        let first: NodeDefinition = NodeBuilder::new("first").input("in").output("out").build();
3067        let second: NodeDefinition = NodeBuilder::new("second").input("in").output("out").build();
3068        let graph = pureflow_workflow::WorkflowGraph::with_cycles_allowed(
3069            [first, second],
3070            [
3071                EdgeDefinition::new(
3072                    pureflow_workflow::EdgeEndpoint::new(node_id("first"), port_id("out")),
3073                    pureflow_workflow::EdgeEndpoint::new(node_id("second"), port_id("in")),
3074                ),
3075                EdgeDefinition::new(
3076                    pureflow_workflow::EdgeEndpoint::new(node_id("second"), port_id("out")),
3077                    pureflow_workflow::EdgeEndpoint::new(node_id("first"), port_id("in")),
3078                ),
3079            ],
3080        )
3081        .expect("cycle-allowed workflow graph should build");
3082
3083        WorkflowDefinition::new(workflow_id("flow"), graph)
3084    }
3085
3086    fn packet_payload_bytes(packet: PortPacket) -> Vec<u8> {
3087        packet
3088            .into_payload()
3089            .as_bytes()
3090            .expect("engine backpressure tests send bytes")
3091            .to_vec()
3092    }
3093
3094    #[test]
3095    fn workflow_run_policy_names_feedback_loop_startup_and_termination() {
3096        let feedback_loop: FeedbackLoopRunPolicy =
3097            FeedbackLoopRunPolicy::start_all_nodes_until_complete();
3098        let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(feedback_loop);
3099
3100        assert_eq!(feedback_loop.startup(), FeedbackLoopStartup::StartAllNodes);
3101        assert_eq!(
3102            feedback_loop.termination(),
3103            FeedbackLoopTermination::AllNodesComplete
3104        );
3105        assert_eq!(
3106            policy.cycle_policy(),
3107            CycleRunPolicy::AllowFeedbackLoops(feedback_loop)
3108        );
3109        assert_eq!(
3110            WorkflowRunPolicy::default().cycle_policy(),
3111            CycleRunPolicy::Reject
3112        );
3113        assert_eq!(
3114            WorkflowRunPolicy::default().watchdog_policy(),
3115            WorkflowWatchdogPolicy::Disabled
3116        );
3117    }
3118
3119    #[test]
3120    fn run_workflow_passes_execution_metadata_to_each_node() {
3121        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3122            .node(NodeBuilder::new("first").build())
3123            .node(NodeBuilder::new("second").build())
3124            .build();
3125        let execution: ExecutionMetadata = execution_metadata("run-1");
3126        let executor: RecordingExecutor = RecordingExecutor::default();
3127
3128        block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
3129
3130        let contexts: Vec<NodeContext> = executor.visited_contexts();
3131        assert_eq!(contexts[0].workflow_id().as_str(), "flow");
3132        assert_eq!(contexts[0].execution().execution_id().as_str(), "run-1");
3133        assert_eq!(executor.visited_node_names(), vec!["first", "second"]);
3134    }
3135
3136    #[test]
3137    fn run_workflow_with_metadata_sink_records_lifecycle_events() {
3138        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3139            .node(NodeBuilder::new("source").output("out").build())
3140            .node(NodeBuilder::new("sink").input("in").build())
3141            .edge("source", "out", "sink", "in")
3142            .build();
3143        let execution: ExecutionMetadata = execution_metadata("run-1");
3144        let executor: RecordingExecutor = RecordingExecutor::default();
3145        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
3146
3147        block_on(run_workflow_with_metadata_sink(
3148            &workflow,
3149            &execution,
3150            &executor,
3151            sink.clone(),
3152        ))
3153        .expect("metadata workflow run should succeed");
3154        let lifecycle_count: usize = sink
3155            .records()
3156            .iter()
3157            .filter(|record: &&MetadataRecord| matches!(record, MetadataRecord::Lifecycle(_)))
3158            .count();
3159
3160        assert_eq!(lifecycle_count, 4);
3161    }
3162
3163    #[test]
3164    fn run_workflow_with_observers_summary_records_lifecycle_hook_and_metadata() {
3165        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3166            .node(NodeBuilder::new("node").build())
3167            .build();
3168        let execution: ExecutionMetadata = execution_metadata("run-1");
3169        let executor: RecordingExecutor = RecordingExecutor::default();
3170        let hook: RecordingLifecycleHook = RecordingLifecycleHook::default();
3171        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
3172
3173        let summary: WorkflowRunSummary = block_on(run_workflow_with_observers_summary(
3174            &workflow,
3175            &execution,
3176            &executor,
3177            &hook,
3178            sink.clone(),
3179        ))
3180        .expect("observer workflow run should return a summary");
3181
3182        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3183        assert_eq!(summary.completed_node_count(), 1);
3184        assert_eq!(
3185            hook.recorded(),
3186            vec![
3187                LifecycleEventKind::NodeStarted,
3188                LifecycleEventKind::NodeCompleted
3189            ]
3190        );
3191
3192        let lifecycle_records: Vec<MetadataRecord> = sink
3193            .records()
3194            .into_iter()
3195            .filter(|record: &MetadataRecord| matches!(record, MetadataRecord::Lifecycle(_)))
3196            .collect();
3197        assert_eq!(lifecycle_records.len(), 2);
3198    }
3199
3200    #[test]
3201    fn run_workflow_with_registry_resolves_executor_per_node() {
3202        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3203            .node(NodeBuilder::new("source").output("out").build())
3204            .node(NodeBuilder::new("sink").input("in").build())
3205            .edge("source", "out", "sink", "in")
3206            .build();
3207        let execution: ExecutionMetadata = execution_metadata("run-1");
3208        let received: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
3209        let registry: StaticNodeExecutorRegistry<RegistryExecutor> =
3210            StaticNodeExecutorRegistry::new(BTreeMap::from([
3211                (
3212                    node_id("source"),
3213                    RegistryExecutor::source(Arc::clone(&received)),
3214                ),
3215                (
3216                    node_id("sink"),
3217                    RegistryExecutor::sink(Arc::clone(&received)),
3218                ),
3219            ]));
3220
3221        block_on(run_workflow_with_registry(&workflow, &execution, &registry))
3222            .expect("registry workflow should run");
3223
3224        assert_eq!(
3225            *received
3226                .lock()
3227                .expect("registry test lock should not be poisoned"),
3228            vec![b"registered".to_vec()]
3229        );
3230    }
3231
3232    #[test]
3233    fn run_workflow_with_registry_rejects_missing_executor() {
3234        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3235            .node(NodeBuilder::new("missing").build())
3236            .build();
3237        let execution: ExecutionMetadata = execution_metadata("run-1");
3238        let registry: StaticNodeExecutorRegistry<RegistryExecutor> =
3239            StaticNodeExecutorRegistry::new(BTreeMap::new());
3240
3241        let err: PureflowError =
3242            block_on(run_workflow_with_registry(&workflow, &execution, &registry))
3243                .expect_err("missing registry entry should fail");
3244
3245        assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
3246        assert!(
3247            err.to_string()
3248                .contains("no executor registered for workflow node `missing`")
3249        );
3250    }
3251
3252    #[test]
3253    fn run_workflow_summary_reports_successful_terminal_state() {
3254        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3255            .node(NodeBuilder::new("first").build())
3256            .node(NodeBuilder::new("second").build())
3257            .build();
3258        let execution: ExecutionMetadata = execution_metadata("run-1");
3259        let executor: RecordingExecutor = RecordingExecutor::default();
3260
3261        let summary: WorkflowRunSummary =
3262            block_on(run_workflow_summary(&workflow, &execution, &executor))
3263                .expect("summary workflow should run");
3264
3265        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3266        assert_eq!(summary.scheduled_node_count(), 2);
3267        assert_eq!(summary.completed_node_count(), 2);
3268        assert_eq!(summary.failed_node_count(), 0);
3269        assert_eq!(summary.cancelled_node_count(), 0);
3270        assert_eq!(summary.pending_node_count(), 0);
3271        assert_eq!(summary.error_count(), 0);
3272        assert_eq!(summary.observed_message_count(), 0);
3273        assert!(summary.first_error().is_none());
3274    }
3275
3276    #[test]
3277    fn run_workflow_summary_rejects_cycle_allowed_graph_by_default() {
3278        let workflow: WorkflowDefinition = cyclic_workflow();
3279        let execution: ExecutionMetadata = execution_metadata("run-1");
3280        let executor: RecordingExecutor = RecordingExecutor::default();
3281
3282        let err: PureflowError = block_on(run_workflow_summary(&workflow, &execution, &executor))
3283            .expect_err("default run policy should reject cyclic workflow");
3284
3285        assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
3286        assert!(err.to_string().contains("first -> second -> first"));
3287        assert!(
3288            err.to_string()
3289                .contains("explicit feedback-loop run policy")
3290        );
3291        assert!(executor.visited_contexts().is_empty());
3292    }
3293
3294    #[test]
3295    fn run_workflow_with_feedback_loop_policy_allows_cycle_allowed_graph() {
3296        let workflow: WorkflowDefinition = cyclic_workflow();
3297        let execution: ExecutionMetadata = execution_metadata("run-1");
3298        let executor: RecordingExecutor = RecordingExecutor::default();
3299        let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(
3300            FeedbackLoopRunPolicy::start_all_nodes_until_complete(),
3301        );
3302
3303        let summary: WorkflowRunSummary = block_on(run_workflow_with_policy_summary(
3304            &workflow, &execution, &executor, policy,
3305        ))
3306        .expect("feedback-loop policy should allow cyclic workflow");
3307
3308        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3309        assert_eq!(summary.scheduled_node_count(), 2);
3310        assert_eq!(summary.completed_node_count(), 2);
3311        assert_eq!(
3312            executor.visited_node_names(),
3313            vec![String::from("first"), String::from("second")]
3314        );
3315    }
3316
3317    #[test]
3318    fn feedback_loop_policy_runs_deterministic_cycle_messages() {
3319        let workflow: WorkflowDefinition = cyclic_workflow();
3320        let execution: ExecutionMetadata = execution_metadata("run-1");
3321        let observed: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
3322        let registry: StaticNodeExecutorRegistry<FeedbackLoopExecutor> =
3323            StaticNodeExecutorRegistry::new(BTreeMap::from([
3324                (
3325                    node_id("first"),
3326                    FeedbackLoopExecutor::driver(Arc::clone(&observed)),
3327                ),
3328                (
3329                    node_id("second"),
3330                    FeedbackLoopExecutor::counter(Arc::clone(&observed)),
3331                ),
3332            ]));
3333        let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(
3334            FeedbackLoopRunPolicy::start_all_nodes_until_complete(),
3335        );
3336
3337        let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_policy_summary(
3338            &workflow, &execution, &registry, policy,
3339        ))
3340        .expect("explicit feedback-loop policy should run cyclic messages");
3341
3342        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3343        assert_eq!(summary.completed_node_count(), 2);
3344        assert_eq!(summary.pending_node_count(), 0);
3345        assert_eq!(
3346            *observed
3347                .lock()
3348                .expect("feedback loop observed lock should not be poisoned"),
3349            vec![String::from("counter:seed"), String::from("driver:ack")]
3350        );
3351    }
3352
3353    #[test]
3354    fn feedback_loop_policy_cancels_siblings_for_shutdown() {
3355        let workflow: WorkflowDefinition = cyclic_workflow();
3356        let execution: ExecutionMetadata = execution_metadata("run-1");
3357        let cancellation_observed: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3358        let registry: StaticNodeExecutorRegistry<FeedbackLoopShutdownExecutor> =
3359            StaticNodeExecutorRegistry::new(BTreeMap::from([
3360                (
3361                    node_id("first"),
3362                    FeedbackLoopShutdownExecutor::failing(Arc::clone(&cancellation_observed)),
3363                ),
3364                (
3365                    node_id("second"),
3366                    FeedbackLoopShutdownExecutor::shutdown_watcher(Arc::clone(
3367                        &cancellation_observed,
3368                    )),
3369                ),
3370            ]));
3371        let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(
3372            FeedbackLoopRunPolicy::start_all_nodes_until_complete(),
3373        );
3374
3375        let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_policy_summary(
3376            &workflow, &execution, &registry, policy,
3377        ))
3378        .expect("feedback-loop shutdown should return summary data");
3379
3380        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3381        assert_eq!(summary.completed_node_count(), 1);
3382        assert_eq!(summary.failed_node_count(), 1);
3383        assert_eq!(summary.pending_node_count(), 0);
3384        assert!(
3385            summary
3386                .first_error()
3387                .expect("shutdown failure should be retained")
3388                .to_string()
3389                .contains("feedback loop shutdown requested")
3390        );
3391        assert!(
3392            *cancellation_observed
3393                .lock()
3394                .expect("shutdown observed lock should not be poisoned")
3395        );
3396    }
3397
3398    #[test]
3399    fn workflow_deadlock_watchdog_reports_stalled_cycle_state() {
3400        let workflow: WorkflowDefinition = cyclic_workflow();
3401        let execution: ExecutionMetadata = execution_metadata("run-1");
3402        let executor: WaitingInputExecutor = WaitingInputExecutor::default();
3403        let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(
3404            FeedbackLoopRunPolicy::start_all_nodes_until_complete(),
3405        )
3406        .with_watchdog(WorkflowWatchdogPolicy::deadlock_after(
3407            Duration::from_millis(1),
3408        ));
3409
3410        let summary: WorkflowRunSummary = block_on(run_workflow_with_policy_summary(
3411            &workflow, &execution, &executor, policy,
3412        ))
3413        .expect("deadlock watchdog should report a stalled cyclic workflow as summary data");
3414        let diagnostic: &WorkflowDeadlockDiagnostic = summary
3415            .deadlock_diagnostic()
3416            .expect("deadlock diagnostic should be captured");
3417        let err_text: String = summary
3418            .first_error()
3419            .expect("deadlock should be recorded as first error")
3420            .to_string();
3421
3422        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3423        assert_eq!(summary.error_count(), 1);
3424        assert_eq!(summary.pending_node_count(), 2);
3425        assert!(err_text.contains("watchdog detected no workflow progress"));
3426        assert_eq!(diagnostic.pending_node_count(), 2);
3427        assert_eq!(diagnostic.bounded_edge_count(), 2);
3428        assert_eq!(
3429            executor.visited_node_names(),
3430            vec![String::from("first"), String::from("second")]
3431        );
3432    }
3433
3434    #[test]
3435    fn run_workflow_summary_retains_first_failure_without_returning_error() {
3436        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3437            .node(NodeBuilder::new("first").build())
3438            .node(NodeBuilder::new("second").build())
3439            .build();
3440        let execution: ExecutionMetadata = execution_metadata("run-1");
3441        let executor: AggregateFailureExecutor = AggregateFailureExecutor::default();
3442
3443        let summary: WorkflowRunSummary =
3444            block_on(run_workflow_summary(&workflow, &execution, &executor))
3445                .expect("summary should preserve node failures as data");
3446
3447        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3448        assert_eq!(summary.scheduled_node_count(), 2);
3449        assert_eq!(summary.completed_node_count(), 1);
3450        assert_eq!(summary.failed_node_count(), 1);
3451        assert_eq!(summary.cancelled_node_count(), 0);
3452        assert_eq!(summary.error_count(), 1);
3453        assert!(
3454            summary
3455                .first_error()
3456                .expect("summary should retain first error")
3457                .to_string()
3458                .contains("first failed")
3459        );
3460    }
3461
3462    #[test]
3463    fn run_workflow_with_metadata_sink_records_workflow_error_metadata() {
3464        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3465            .node(NodeBuilder::new("first").build())
3466            .node(NodeBuilder::new("second").build())
3467            .build();
3468        let execution: ExecutionMetadata = execution_metadata("run-1");
3469        let executor: AggregateFailureExecutor = AggregateFailureExecutor::default();
3470        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
3471
3472        let summary: WorkflowRunSummary = block_on(run_workflow_with_metadata_sink_summary(
3473            &workflow,
3474            &execution,
3475            &executor,
3476            sink.clone(),
3477        ))
3478        .expect("summary should preserve node failures as data");
3479        let records: Vec<MetadataRecord> = sink.records();
3480        let workflow_error = records
3481            .iter()
3482            .find_map(|record: &MetadataRecord| match record {
3483                MetadataRecord::Error(error)
3484                    if error.kind() == ErrorMetadataKind::WorkflowFailed =>
3485                {
3486                    Some(error)
3487                }
3488                _ => None,
3489            })
3490            .expect("workflow error metadata should be recorded");
3491        let node_error_count: usize = records
3492            .iter()
3493            .filter(|record: &&MetadataRecord| {
3494                matches!(
3495                    record,
3496                    MetadataRecord::Error(error)
3497                        if error.kind() == ErrorMetadataKind::NodeFailed
3498                )
3499            })
3500            .count();
3501
3502        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3503        assert_eq!(workflow_error.workflow_id().as_str(), "flow");
3504        assert!(workflow_error.node_id().is_none());
3505        assert!(workflow_error.error().to_string().contains("first failed"));
3506        assert!(workflow_error.diagnostic().is_none());
3507        assert_eq!(node_error_count, 1);
3508    }
3509
3510    #[test]
3511    fn watchdog_metadata_records_deadlock_diagnostic_payload() {
3512        let workflow: WorkflowDefinition = cyclic_workflow();
3513        let execution: ExecutionMetadata = execution_metadata("run-1");
3514        let executor: WaitingInputExecutor = WaitingInputExecutor::default();
3515        let registry: SingleNodeExecutorRegistry<'_, WaitingInputExecutor> =
3516            SingleNodeExecutorRegistry::new(&executor);
3517        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
3518        let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(
3519            FeedbackLoopRunPolicy::start_all_nodes_until_complete(),
3520        )
3521        .with_watchdog(WorkflowWatchdogPolicy::deadlock_after(
3522            Duration::from_millis(1),
3523        ));
3524
3525        let summary: WorkflowRunSummary =
3526            block_on(run_workflow_with_registry_policy_and_metadata_sink_summary(
3527                &workflow,
3528                &execution,
3529                &registry,
3530                policy,
3531                sink.clone(),
3532            ))
3533            .expect("watchdog run should return summary data");
3534        let records: Vec<MetadataRecord> = sink.records();
3535        let workflow_error = records
3536            .iter()
3537            .find_map(|record: &MetadataRecord| match record {
3538                MetadataRecord::Error(error)
3539                    if error.kind() == ErrorMetadataKind::WorkflowFailed =>
3540                {
3541                    Some(error)
3542                }
3543                _ => None,
3544            })
3545            .expect("workflow error metadata should be recorded");
3546
3547        assert!(summary.deadlock_diagnostic().is_some());
3548        match workflow_error.diagnostic() {
3549            Some(ErrorDiagnosticMetadata::WorkflowDeadlock(deadlock)) => {
3550                assert_eq!(deadlock.pending_node_count(), 2);
3551                assert_eq!(deadlock.bounded_edge_count(), 2);
3552                assert_eq!(deadlock.no_progress_timeout_ms(), 1);
3553                assert_eq!(deadlock.cycle_policy(), "allow_feedback_loops");
3554                assert_eq!(deadlock.feedback_loop_startup(), Some("start_all_nodes"));
3555                assert_eq!(
3556                    deadlock.feedback_loop_termination(),
3557                    Some("all_nodes_complete")
3558                );
3559            }
3560            _ => panic!("workflow error should include deadlock diagnostic metadata"),
3561        }
3562    }
3563
3564    #[test]
3565    fn supervisor_summary_covers_failing_source_and_downstream_cancellation() {
3566        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3567            .node(NodeBuilder::new("source").output("out").build())
3568            .node(
3569                NodeBuilder::new("transform")
3570                    .input("in")
3571                    .output("out")
3572                    .build(),
3573            )
3574            .node(NodeBuilder::new("sink").input("in").build())
3575            .edge("source", "out", "transform", "in")
3576            .edge("transform", "out", "sink", "in")
3577            .build();
3578        let execution: ExecutionMetadata = execution_metadata("run-1");
3579        let registry: StaticNodeExecutorRegistry<FailureMatrixExecutor> =
3580            StaticNodeExecutorRegistry::new(BTreeMap::from([
3581                (node_id("source"), FailureMatrixExecutor::source_fails()),
3582                (
3583                    node_id("transform"),
3584                    FailureMatrixExecutor::passthrough_transform(),
3585                ),
3586                (node_id("sink"), FailureMatrixExecutor::sink_waits()),
3587            ]));
3588
3589        let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_summary(
3590            &workflow, &execution, &registry,
3591        ))
3592        .expect("summary should preserve source failure as data");
3593
3594        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3595        assert_eq!(summary.scheduled_node_count(), 3);
3596        assert_eq!(summary.completed_node_count(), 0);
3597        assert_eq!(summary.failed_node_count(), 1);
3598        assert_eq!(summary.cancelled_node_count(), 2);
3599        assert_eq!(summary.pending_node_count(), 0);
3600        assert_eq!(summary.error_count(), 3);
3601        assert!(
3602            summary
3603                .first_error()
3604                .expect("summary should retain source failure")
3605                .to_string()
3606                .contains("matrix source failed")
3607        );
3608    }
3609
3610    #[test]
3611    fn supervisor_summary_covers_failing_transform_and_error_metadata() {
3612        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3613            .node(NodeBuilder::new("source").output("out").build())
3614            .node(
3615                NodeBuilder::new("transform")
3616                    .input("in")
3617                    .output("out")
3618                    .build(),
3619            )
3620            .node(NodeBuilder::new("sink").input("in").build())
3621            .edge("source", "out", "transform", "in")
3622            .edge("transform", "out", "sink", "in")
3623            .build();
3624        let execution: ExecutionMetadata = execution_metadata("run-1");
3625        let registry: StaticNodeExecutorRegistry<FailureMatrixExecutor> =
3626            StaticNodeExecutorRegistry::new(BTreeMap::from([
3627                (
3628                    node_id("source"),
3629                    FailureMatrixExecutor::source_for_transform_failure(),
3630                ),
3631                (
3632                    node_id("transform"),
3633                    FailureMatrixExecutor::transform_fails(),
3634                ),
3635                (node_id("sink"), FailureMatrixExecutor::sink_waits()),
3636            ]));
3637        let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
3638
3639        let summary: WorkflowRunSummary =
3640            block_on(run_workflow_with_registry_and_metadata_sink_summary(
3641                &workflow,
3642                &execution,
3643                &registry,
3644                sink.clone(),
3645            ))
3646            .expect("summary should preserve transform failure as data");
3647        let records = sink.records();
3648        let node_error_count = records
3649            .iter()
3650            .filter(|record| {
3651                matches!(
3652                    record,
3653                    MetadataRecord::Error(error)
3654                        if error.kind() == ErrorMetadataKind::NodeFailed
3655                )
3656            })
3657            .count();
3658        let workflow_error = records
3659            .iter()
3660            .find_map(|record| match record {
3661                MetadataRecord::Error(error)
3662                    if error.kind() == ErrorMetadataKind::WorkflowFailed =>
3663                {
3664                    Some(error)
3665                }
3666                _ => None,
3667            })
3668            .expect("workflow error metadata should be recorded");
3669
3670        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3671        assert_eq!(summary.completed_node_count(), 1);
3672        assert_eq!(summary.failed_node_count(), 1);
3673        assert_eq!(summary.cancelled_node_count(), 1);
3674        assert_eq!(summary.pending_node_count(), 0);
3675        assert_eq!(summary.error_count(), 2);
3676        assert!(
3677            workflow_error
3678                .error()
3679                .to_string()
3680                .contains("matrix transform failed")
3681        );
3682        assert!(workflow_error.node_id().is_none());
3683        assert_eq!(node_error_count, 2);
3684    }
3685
3686    #[test]
3687    fn supervisor_summary_covers_disconnected_downstream_send_failure() {
3688        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3689            .node(NodeBuilder::new("source").output("out").build())
3690            .node(NodeBuilder::new("sink").input("in").build())
3691            .edge("source", "out", "sink", "in")
3692            .build();
3693        let execution: ExecutionMetadata = execution_metadata("run-1");
3694        let (source, sink) = DisconnectedDownstreamExecutor::pair();
3695        let registry: StaticNodeExecutorRegistry<DisconnectedDownstreamExecutor> =
3696            StaticNodeExecutorRegistry::new(BTreeMap::from([
3697                (node_id("source"), source),
3698                (node_id("sink"), sink),
3699            ]));
3700
3701        let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_summary(
3702            &workflow, &execution, &registry,
3703        ))
3704        .expect("summary should preserve disconnected send failure as data");
3705
3706        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3707        assert_eq!(summary.completed_node_count(), 1);
3708        assert_eq!(summary.failed_node_count(), 1);
3709        assert_eq!(summary.cancelled_node_count(), 0);
3710        assert_eq!(summary.pending_node_count(), 0);
3711        assert_eq!(summary.error_count(), 1);
3712        assert!(
3713            summary
3714                .first_error()
3715                .expect("summary should retain disconnected send error")
3716                .to_string()
3717                .contains("disconnected")
3718        );
3719    }
3720
3721    #[test]
3722    fn supervisor_rejects_fan_out_partial_send_after_one_downstream_disconnects() {
3723        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3724            .node(NodeBuilder::new("source").output("out").build())
3725            .node(NodeBuilder::new("good").input("in").build())
3726            .node(NodeBuilder::new("drop").input("in").build())
3727            .edge("source", "out", "good", "in")
3728            .edge("source", "out", "drop", "in")
3729            .build();
3730        let execution: ExecutionMetadata = execution_metadata("run-1");
3731        let registry = FanOutPartialFailureExecutor::registry();
3732
3733        let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_summary(
3734            &workflow, &execution, &registry,
3735        ))
3736        .expect("summary should preserve fan-out send failure as data");
3737        let good_sink_observation = registry
3738            .executors()
3739            .get(&node_id("good"))
3740            .expect("good sink executor should be registered")
3741            .good_sink_observation()
3742            .expect("good sink should record cancellation or closure");
3743
3744        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3745        assert_eq!(summary.completed_node_count(), 2);
3746        assert_eq!(summary.failed_node_count(), 1);
3747        assert_eq!(summary.cancelled_node_count(), 0);
3748        assert_eq!(summary.pending_node_count(), 0);
3749        assert_eq!(summary.error_count(), 1);
3750        assert!(
3751            summary
3752                .first_error()
3753                .expect("summary should retain fan-out send error")
3754                .to_string()
3755                .contains("disconnected")
3756        );
3757        assert!(
3758            good_sink_observation == "cancelled_without_packet"
3759                || good_sink_observation == "disconnected_without_packet"
3760                || good_sink_observation == "closed_without_packet",
3761            "good sink must not receive a partial fan-out packet: {good_sink_observation}"
3762        );
3763    }
3764
3765    #[test]
3766    fn run_workflow_summary_reports_cancellation_terminal_state() {
3767        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3768            .node(NodeBuilder::new("node").build())
3769            .build();
3770        let execution: ExecutionMetadata = execution_metadata("run-1");
3771        let executor: CancelledExecutor = CancelledExecutor;
3772
3773        let summary: WorkflowRunSummary =
3774            block_on(run_workflow_summary(&workflow, &execution, &executor))
3775                .expect("summary should preserve cancellation as data");
3776
3777        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Cancelled);
3778        assert_eq!(summary.scheduled_node_count(), 1);
3779        assert_eq!(summary.completed_node_count(), 0);
3780        assert_eq!(summary.failed_node_count(), 0);
3781        assert_eq!(summary.cancelled_node_count(), 1);
3782        assert_eq!(summary.error_count(), 1);
3783    }
3784
3785    #[test]
3786    fn run_workflow_with_contracts_summary_accepts_matching_output_source() {
3787        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3788            .node(NodeBuilder::new("source").output("out").build())
3789            .build();
3790        let execution: ExecutionMetadata = execution_metadata("run-1");
3791        let executor: ContractOutputExecutor = ContractOutputExecutor::matching_source();
3792
3793        let summary: WorkflowRunSummary = block_on(run_workflow_with_contracts_summary(
3794            &workflow,
3795            &execution,
3796            &executor,
3797            &source_output_contracts(),
3798        ))
3799        .expect("contract-aware workflow should run");
3800
3801        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3802        assert_eq!(summary.completed_node_count(), 1);
3803        assert!(summary.first_error().is_none());
3804    }
3805
3806    #[test]
3807    fn run_workflow_with_contracts_summary_rejects_mismatched_output_source() {
3808        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3809            .node(NodeBuilder::new("source").output("out").build())
3810            .build();
3811        let execution: ExecutionMetadata = execution_metadata("run-1");
3812        let executor: ContractOutputExecutor = ContractOutputExecutor::mismatched_source();
3813
3814        let summary: WorkflowRunSummary = block_on(run_workflow_with_contracts_summary(
3815            &workflow,
3816            &execution,
3817            &executor,
3818            &source_output_contracts(),
3819        ))
3820        .expect("contract-aware summary should preserve output validation failures as data");
3821
3822        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3823        assert_eq!(summary.completed_node_count(), 0);
3824        assert_eq!(summary.failed_node_count(), 1);
3825        assert!(
3826            summary
3827                .first_error()
3828                .expect("summary should retain output validation error")
3829                .to_string()
3830                .contains("does not match output")
3831        );
3832    }
3833
3834    #[test]
3835    fn batch_node_executor_sends_matching_outputs_through_graph_edges() {
3836        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3837            .node(NodeBuilder::new("source").output("out").build())
3838            .node(NodeBuilder::new("sink").input("in").build())
3839            .edge("source", "out", "sink", "in")
3840            .build();
3841        let execution: ExecutionMetadata = execution_metadata("run-1");
3842        let received: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
3843        let registry: StaticNodeExecutorRegistry<ContractBatchRegistryExecutor> =
3844            StaticNodeExecutorRegistry::new(BTreeMap::from([
3845                (
3846                    node_id("source"),
3847                    ContractBatchRegistryExecutor::Batch(BatchNodeExecutor::new(
3848                        ContractBatchExecutor::matching_source(),
3849                    )),
3850                ),
3851                (
3852                    node_id("sink"),
3853                    ContractBatchRegistryExecutor::Sink(RecordingSinkExecutor::new(Arc::clone(
3854                        &received,
3855                    ))),
3856                ),
3857            ]));
3858
3859        let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_contracts_summary(
3860            &workflow,
3861            &execution,
3862            &registry,
3863            &source_output_contracts(),
3864        ))
3865        .expect("batch workflow should run through output contracts");
3866
3867        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3868        assert_eq!(
3869            *received
3870                .lock()
3871                .expect("batch sink received lock should not be poisoned"),
3872            vec![b"contracted".to_vec()]
3873        );
3874    }
3875
3876    #[test]
3877    fn batch_node_executor_rejects_mismatched_output_before_sink_observes_it() {
3878        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3879            .node(NodeBuilder::new("source").output("out").build())
3880            .node(NodeBuilder::new("sink").input("in").build())
3881            .edge("source", "out", "sink", "in")
3882            .build();
3883        let execution: ExecutionMetadata = execution_metadata("run-1");
3884        let received: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
3885        let registry: StaticNodeExecutorRegistry<ContractBatchRegistryExecutor> =
3886            StaticNodeExecutorRegistry::new(BTreeMap::from([
3887                (
3888                    node_id("source"),
3889                    ContractBatchRegistryExecutor::Batch(BatchNodeExecutor::new(
3890                        ContractBatchExecutor::mismatched_source(),
3891                    )),
3892                ),
3893                (
3894                    node_id("sink"),
3895                    ContractBatchRegistryExecutor::Sink(RecordingSinkExecutor::new(Arc::clone(
3896                        &received,
3897                    ))),
3898                ),
3899            ]));
3900
3901        let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_contracts_summary(
3902            &workflow,
3903            &execution,
3904            &registry,
3905            &source_output_contracts(),
3906        ))
3907        .expect("batch output validation failures should be preserved as summary data");
3908
3909        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3910        assert!(
3911            summary
3912                .first_error()
3913                .expect("summary should retain output validation error")
3914                .to_string()
3915                .contains("does not match output")
3916        );
3917        assert!(
3918            received
3919                .lock()
3920                .expect("batch sink received lock should not be poisoned")
3921                .is_empty()
3922        );
3923    }
3924
3925    #[test]
3926    fn batch_node_executor_rejects_undeclared_output_ports() {
3927        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3928            .node(NodeBuilder::new("source").output("out").build())
3929            .build();
3930        let execution: ExecutionMetadata = execution_metadata("run-1");
3931        let executor: BatchNodeExecutor<ContractBatchExecutor> =
3932            BatchNodeExecutor::new(ContractBatchExecutor::unknown_port());
3933
3934        let summary: WorkflowRunSummary =
3935            block_on(run_workflow_summary(&workflow, &execution, &executor))
3936                .expect("batch output validation failures should be summary data");
3937
3938        assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3939        assert!(
3940            summary
3941                .first_error()
3942                .expect("summary should retain unknown output error")
3943                .to_string()
3944                .contains("output port `rogue` is not declared")
3945        );
3946    }
3947
3948    #[test]
3949    fn run_workflow_propagates_executor_failures() {
3950        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3951            .node(NodeBuilder::new("first").build())
3952            .build();
3953        let execution: ExecutionMetadata = execution_metadata("run-1");
3954        let executor: FailingExecutor = FailingExecutor::execution("boom");
3955
3956        let err = block_on(run_workflow(&workflow, &execution, &executor))
3957            .expect_err("workflow should surface executor failures");
3958
3959        assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
3960    }
3961
3962    #[test]
3963    fn run_workflow_passes_declared_node_ports_to_executor() {
3964        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3965            .node(NodeBuilder::new("source").output("out").build())
3966            .node(NodeBuilder::new("sink").input("in").build())
3967            .build();
3968        let execution: ExecutionMetadata = execution_metadata("run-1");
3969        let executor: RecordingExecutor = RecordingExecutor::default();
3970
3971        block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
3972
3973        assert_eq!(
3974            executor.visited_input_port_names(),
3975            vec![Vec::<String>::new(), vec![String::from("in")]]
3976        );
3977        assert_eq!(
3978            executor.visited_output_port_names(),
3979            vec![vec![String::from("out")], Vec::<String>::new()]
3980        );
3981    }
3982
3983    #[test]
3984    fn run_workflow_wires_edges_as_bounded_port_channels() {
3985        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3986            .node(NodeBuilder::new("source").output("out").build())
3987            .node(NodeBuilder::new("sink").input("in").build())
3988            .edge("source", "out", "sink", "in")
3989            .build();
3990        let execution: ExecutionMetadata = execution_metadata("run-1");
3991        let executor: ChannelExecutor = ChannelExecutor::default();
3992
3993        block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
3994
3995        assert_eq!(
3996            executor.received_payloads(),
3997            vec![b"hello".to_vec(), b"world".to_vec()]
3998        );
3999    }
4000
4001    #[test]
4002    fn run_workflow_uses_explicit_edge_capacity() {
4003        let workflow: WorkflowDefinition = WorkflowDefinition::from_parts(
4004            workflow_id("flow"),
4005            [
4006                NodeBuilder::new("source").output("out").build(),
4007                NodeBuilder::new("probe").input("in").build(),
4008            ],
4009            [EdgeDefinition::with_capacity(
4010                pureflow_workflow::EdgeEndpoint::new(node_id("source"), port_id("out")),
4011                pureflow_workflow::EdgeEndpoint::new(node_id("probe"), port_id("in")),
4012                NonZeroUsize::new(3).expect("nonzero"),
4013            )],
4014        )
4015        .expect("workflow should be valid");
4016        let execution: ExecutionMetadata = execution_metadata("run-1");
4017        let executor: CapacityProbeExecutor = CapacityProbeExecutor::default();
4018
4019        block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
4020
4021        assert_eq!(
4022            executor.observed_capacities(),
4023            vec![Some(NonZeroUsize::new(3).expect("nonzero").get())]
4024        );
4025    }
4026
4027    #[test]
4028    fn run_workflow_backpressure_blocks_until_downstream_receives() {
4029        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
4030            .node(NodeBuilder::new("source").output("out").build())
4031            .node(NodeBuilder::new("sink").input("in").build())
4032            .edge_with_capacity(
4033                "source",
4034                "out",
4035                "sink",
4036                "in",
4037                NonZeroUsize::new(1).expect("nonzero"),
4038            )
4039            .build();
4040        let execution: ExecutionMetadata = execution_metadata("run-1");
4041        let executor: BoundedBackpressureExecutor = BoundedBackpressureExecutor::default();
4042
4043        block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
4044
4045        let events: Vec<String> = executor.events();
4046        assert!(
4047            events
4048                .iter()
4049                .any(|event: &String| event == "source-observed-full-edge")
4050        );
4051        assert!(
4052            events
4053                .iter()
4054                .any(|event: &String| event == "source-second-send-completed")
4055        );
4056        assert_eq!(
4057            executor.received_payloads(),
4058            vec![b"first".to_vec(), b"second".to_vec()]
4059        );
4060    }
4061
4062    #[test]
4063    fn run_workflow_fans_out_one_output_to_all_downstream_inputs() {
4064        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
4065            .node(NodeBuilder::new("source").output("out").build())
4066            .node(NodeBuilder::new("left").input("in").build())
4067            .node(NodeBuilder::new("right").input("in").build())
4068            .edge("source", "out", "left", "in")
4069            .edge("source", "out", "right", "in")
4070            .build();
4071        let execution: ExecutionMetadata = execution_metadata("run-1");
4072        let executor: FanOutExecutor = FanOutExecutor::default();
4073
4074        block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
4075
4076        let received_by_node: BTreeMap<String, Vec<Vec<u8>>> = executor.received_by_node();
4077        assert_eq!(received_by_node.get("left"), Some(&vec![b"fan".to_vec()]));
4078        assert_eq!(received_by_node.get("right"), Some(&vec![b"fan".to_vec()]));
4079    }
4080
4081    #[test]
4082    fn run_workflow_fans_in_and_propagates_upstream_closure() {
4083        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
4084            .node(NodeBuilder::new("left").output("out").build())
4085            .node(NodeBuilder::new("right").output("out").build())
4086            .node(NodeBuilder::new("collector").input("in").build())
4087            .edge("left", "out", "collector", "in")
4088            .edge("right", "out", "collector", "in")
4089            .build();
4090        let execution: ExecutionMetadata = execution_metadata("run-1");
4091        let executor: FanInClosureExecutor = FanInClosureExecutor::default();
4092
4093        block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
4094
4095        let mut received: Vec<Vec<u8>> = executor.received_payloads();
4096        received.sort();
4097        assert_eq!(received, vec![b"left".to_vec(), b"right".to_vec()]);
4098        assert!(executor.closure_observed());
4099    }
4100
4101    #[test]
4102    fn run_workflow_aggregates_terminal_results_after_polling_all_nodes() {
4103        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
4104            .node(NodeBuilder::new("first").build())
4105            .node(NodeBuilder::new("second").build())
4106            .build();
4107        let execution: ExecutionMetadata = execution_metadata("run-1");
4108        let executor: AggregateFailureExecutor = AggregateFailureExecutor::default();
4109
4110        let err = block_on(run_workflow(&workflow, &execution, &executor))
4111            .expect_err("workflow should surface executor failures");
4112
4113        assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
4114        assert_eq!(
4115            executor.visited_node_names(),
4116            vec![String::from("first"), String::from("second")]
4117        );
4118    }
4119
4120    #[test]
4121    fn run_workflow_cancels_siblings_after_first_node_failure() {
4122        let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
4123            .node(NodeBuilder::new("worker").input("in").build())
4124            .node(NodeBuilder::new("fail").output("out").build())
4125            .edge("fail", "out", "worker", "in")
4126            .build();
4127        let execution: ExecutionMetadata = execution_metadata("run-1");
4128        let executor: SiblingCancellationExecutor = SiblingCancellationExecutor::default();
4129
4130        let err = block_on(run_workflow(&workflow, &execution, &executor))
4131            .expect_err("workflow should surface the first node failure");
4132
4133        assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
4134        assert!(executor.cancellation_observed());
4135    }
4136}