1use std::num::NonZeroUsize;
4use std::time::Duration;
5use std::{collections::BTreeMap, fmt, future::Future, sync::Arc};
6
7use pureflow_contract::{NodeContract, PortContract, SchemaRef};
8use pureflow_core::{
9 BatchExecutor, BatchInputs, BatchOutputs, CancellationHandle, PureflowError, InputPortHandle,
10 MetadataRecord, MetadataSink, NodeExecutor, OutputPacketValidator, OutputPortHandle,
11 PortPacket, PortSendError, PortsIn, PortsOut, Result, bounded_edge_channel,
12 context::{CancellationRequest, CancellationToken, ExecutionMetadata, NodeContext},
13 lifecycle::{LifecycleHook, NoopLifecycleHook},
14 message::MessageEndpoint,
15 metadata::{
16 DeadlockDiagnosticMetadata, ErrorDiagnosticMetadata, ErrorMetadataRecord, NoopMetadataSink,
17 },
18};
19use pureflow_runtime::run_node_with_observers;
20use pureflow_types::{NodeId, PortId, WorkflowId};
21use pureflow_workflow::{
22 NodeDefinition, PortDirection, WorkflowDefinition, WorkflowValidationError,
23};
24use futures::{
25 channel::oneshot,
26 future::{BoxFuture, Either, select},
27 stream::{FuturesUnordered, Next, StreamExt},
28};
29
30const DEFAULT_EDGE_CAPACITY: NonZeroUsize = NonZeroUsize::MIN;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub struct WorkflowRunPolicy {
35 cycle_policy: CycleRunPolicy,
36 watchdog_policy: WorkflowWatchdogPolicy,
37}
38
39impl WorkflowRunPolicy {
40 #[must_use]
42 pub const fn acyclic() -> Self {
43 Self {
44 cycle_policy: CycleRunPolicy::Reject,
45 watchdog_policy: WorkflowWatchdogPolicy::Disabled,
46 }
47 }
48
49 #[must_use]
51 pub const fn feedback_loops(feedback_loop: FeedbackLoopRunPolicy) -> Self {
52 Self {
53 cycle_policy: CycleRunPolicy::AllowFeedbackLoops(feedback_loop),
54 watchdog_policy: WorkflowWatchdogPolicy::Disabled,
55 }
56 }
57
58 #[must_use]
60 pub const fn cycle_policy(&self) -> CycleRunPolicy {
61 self.cycle_policy
62 }
63
64 #[must_use]
66 pub const fn with_watchdog(mut self, watchdog_policy: WorkflowWatchdogPolicy) -> Self {
67 self.watchdog_policy = watchdog_policy;
68 self
69 }
70
71 #[must_use]
73 pub const fn watchdog_policy(&self) -> WorkflowWatchdogPolicy {
74 self.watchdog_policy
75 }
76}
77
78impl Default for WorkflowRunPolicy {
79 fn default() -> Self {
80 Self::acyclic()
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum CycleRunPolicy {
87 Reject,
89 AllowFeedbackLoops(FeedbackLoopRunPolicy),
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub struct FeedbackLoopRunPolicy {
96 startup: FeedbackLoopStartup,
97 termination: FeedbackLoopTermination,
98}
99
100impl FeedbackLoopRunPolicy {
101 #[must_use]
103 pub const fn new(startup: FeedbackLoopStartup, termination: FeedbackLoopTermination) -> Self {
104 Self {
105 startup,
106 termination,
107 }
108 }
109
110 #[must_use]
112 pub const fn start_all_nodes_until_complete() -> Self {
113 Self::new(
114 FeedbackLoopStartup::StartAllNodes,
115 FeedbackLoopTermination::AllNodesComplete,
116 )
117 }
118
119 #[must_use]
121 pub const fn startup(&self) -> FeedbackLoopStartup {
122 self.startup
123 }
124
125 #[must_use]
127 pub const fn termination(&self) -> FeedbackLoopTermination {
128 self.termination
129 }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum FeedbackLoopStartup {
135 StartAllNodes,
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141pub enum FeedbackLoopTermination {
142 AllNodesComplete,
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
148pub enum WorkflowWatchdogPolicy {
149 #[default]
151 Disabled,
152 Deadlock(DeadlockWatchdogPolicy),
155}
156
157impl WorkflowWatchdogPolicy {
158 #[must_use]
160 pub const fn disabled() -> Self {
161 Self::Disabled
162 }
163
164 #[must_use]
169 pub const fn deadlock_after(no_progress_timeout: Duration) -> Self {
170 Self::Deadlock(DeadlockWatchdogPolicy::new(no_progress_timeout))
171 }
172}
173
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub struct DeadlockWatchdogPolicy {
177 no_progress_timeout: Duration,
178}
179
180impl DeadlockWatchdogPolicy {
181 #[must_use]
186 pub const fn new(no_progress_timeout: Duration) -> Self {
187 Self {
188 no_progress_timeout,
189 }
190 }
191
192 #[must_use]
194 pub const fn no_progress_timeout(&self) -> Duration {
195 self.no_progress_timeout
196 }
197}
198
199#[derive(Debug, Clone, PartialEq, Eq)]
201pub struct WorkflowDeadlockDiagnostic {
202 workflow_id: WorkflowId,
203 scheduled_node_count: usize,
204 pending_node_count: usize,
205 completed_node_count: usize,
206 failed_node_count: usize,
207 cancelled_node_count: usize,
208 bounded_edge_count: usize,
209 no_progress_timeout: Duration,
210 cycle_policy: CycleRunPolicy,
211}
212
213impl WorkflowDeadlockDiagnostic {
214 fn from_run(
215 workflow: &WorkflowDefinition,
216 summary: &WorkflowRunSummary,
217 policy: WorkflowRunPolicy,
218 watchdog: DeadlockWatchdogPolicy,
219 ) -> Self {
220 Self {
221 workflow_id: workflow.id().clone(),
222 scheduled_node_count: summary.scheduled_node_count(),
223 pending_node_count: summary.pending_node_count(),
224 completed_node_count: summary.completed_node_count(),
225 failed_node_count: summary.failed_node_count(),
226 cancelled_node_count: summary.cancelled_node_count(),
227 bounded_edge_count: workflow.edges().len(),
228 no_progress_timeout: watchdog.no_progress_timeout(),
229 cycle_policy: policy.cycle_policy(),
230 }
231 }
232
233 #[must_use]
235 pub const fn workflow_id(&self) -> &WorkflowId {
236 &self.workflow_id
237 }
238
239 #[must_use]
241 pub const fn scheduled_node_count(&self) -> usize {
242 self.scheduled_node_count
243 }
244
245 #[must_use]
247 pub const fn pending_node_count(&self) -> usize {
248 self.pending_node_count
249 }
250
251 #[must_use]
253 pub const fn completed_node_count(&self) -> usize {
254 self.completed_node_count
255 }
256
257 #[must_use]
259 pub const fn failed_node_count(&self) -> usize {
260 self.failed_node_count
261 }
262
263 #[must_use]
265 pub const fn cancelled_node_count(&self) -> usize {
266 self.cancelled_node_count
267 }
268
269 #[must_use]
271 pub const fn bounded_edge_count(&self) -> usize {
272 self.bounded_edge_count
273 }
274
275 #[must_use]
277 pub const fn no_progress_timeout(&self) -> Duration {
278 self.no_progress_timeout
279 }
280
281 #[must_use]
283 pub const fn cycle_policy(&self) -> CycleRunPolicy {
284 self.cycle_policy
285 }
286
287 fn to_metadata_diagnostic(&self) -> ErrorDiagnosticMetadata {
288 let metadata: DeadlockDiagnosticMetadata = DeadlockDiagnosticMetadata::new(
289 self.scheduled_node_count,
290 self.pending_node_count,
291 self.bounded_edge_count,
292 duration_millis_u64(self.no_progress_timeout),
293 cycle_run_policy_label(self.cycle_policy),
294 )
295 .with_terminal_counts(
296 self.completed_node_count,
297 self.failed_node_count,
298 self.cancelled_node_count,
299 );
300
301 match self.cycle_policy {
302 CycleRunPolicy::Reject => ErrorDiagnosticMetadata::workflow_deadlock(metadata),
303 CycleRunPolicy::AllowFeedbackLoops(feedback_loop) => {
304 ErrorDiagnosticMetadata::workflow_deadlock(metadata.with_feedback_loop(
305 feedback_loop_startup_label(feedback_loop.startup()),
306 feedback_loop_termination_label(feedback_loop.termination()),
307 ))
308 }
309 }
310 }
311}
312
313impl fmt::Display for WorkflowDeadlockDiagnostic {
314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315 write!(
316 f,
317 "workflow `{}` watchdog detected no workflow progress for {:?}: scheduled_nodes={}, pending_nodes={}, completed_nodes={}, failed_nodes={}, cancelled_nodes={}, bounded_edges={}, cycle_policy={:?}",
318 self.workflow_id,
319 self.no_progress_timeout,
320 self.scheduled_node_count,
321 self.pending_node_count,
322 self.completed_node_count,
323 self.failed_node_count,
324 self.cancelled_node_count,
325 self.bounded_edge_count,
326 self.cycle_policy
327 )
328 }
329}
330
331fn duration_millis_u64(duration: Duration) -> u64 {
332 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
333}
334
335const fn cycle_run_policy_label(policy: CycleRunPolicy) -> &'static str {
336 match policy {
337 CycleRunPolicy::Reject => "reject",
338 CycleRunPolicy::AllowFeedbackLoops(_feedback_loop) => "allow_feedback_loops",
339 }
340}
341
342const fn feedback_loop_startup_label(startup: FeedbackLoopStartup) -> &'static str {
343 match startup {
344 FeedbackLoopStartup::StartAllNodes => "start_all_nodes",
345 }
346}
347
348const fn feedback_loop_termination_label(termination: FeedbackLoopTermination) -> &'static str {
349 match termination {
350 FeedbackLoopTermination::AllNodesComplete => "all_nodes_complete",
351 }
352}
353
354#[derive(Debug, Clone, Copy, PartialEq, Eq)]
356pub enum WorkflowTerminalState {
357 Completed,
359 Failed,
362 Cancelled,
364}
365
366#[derive(Debug, Clone, PartialEq, Eq)]
368pub struct WorkflowRunSummary {
369 terminal_state: WorkflowTerminalState,
370 scheduled_node_count: usize,
371 completed_node_count: usize,
372 failed_node_count: usize,
373 cancelled_node_count: usize,
374 observed_message_count: usize,
375 error_count: usize,
376 first_error: Option<PureflowError>,
377 deadlock_diagnostic: Option<WorkflowDeadlockDiagnostic>,
378}
379
380impl WorkflowRunSummary {
381 #[must_use]
383 pub const fn new(scheduled_node_count: usize) -> Self {
384 Self {
385 terminal_state: WorkflowTerminalState::Completed,
386 scheduled_node_count,
387 completed_node_count: 0,
388 failed_node_count: 0,
389 cancelled_node_count: 0,
390 observed_message_count: 0,
391 error_count: 0,
392 first_error: None,
393 deadlock_diagnostic: None,
394 }
395 }
396
397 #[must_use]
399 pub const fn terminal_state(&self) -> WorkflowTerminalState {
400 self.terminal_state
401 }
402
403 #[must_use]
405 pub const fn scheduled_node_count(&self) -> usize {
406 self.scheduled_node_count
407 }
408
409 #[must_use]
411 pub const fn completed_node_count(&self) -> usize {
412 self.completed_node_count
413 }
414
415 #[must_use]
417 pub const fn failed_node_count(&self) -> usize {
418 self.failed_node_count
419 }
420
421 #[must_use]
423 pub const fn cancelled_node_count(&self) -> usize {
424 self.cancelled_node_count
425 }
426
427 #[must_use]
432 pub const fn observed_message_count(&self) -> usize {
433 self.observed_message_count
434 }
435
436 #[must_use]
438 pub const fn error_count(&self) -> usize {
439 self.error_count
440 }
441
442 #[must_use]
444 pub const fn pending_node_count(&self) -> usize {
445 self.scheduled_node_count.saturating_sub(
446 self.completed_node_count
447 .saturating_add(self.failed_node_count)
448 .saturating_add(self.cancelled_node_count),
449 )
450 }
451
452 #[must_use]
454 pub const fn first_error(&self) -> Option<&PureflowError> {
455 self.first_error.as_ref()
456 }
457
458 #[must_use]
460 pub const fn deadlock_diagnostic(&self) -> Option<&WorkflowDeadlockDiagnostic> {
461 self.deadlock_diagnostic.as_ref()
462 }
463
464 pub fn into_result(self) -> Result<()> {
471 self.first_error.map_or(Ok(()), Err)
472 }
473
474 const fn record_success(&mut self) {
475 self.completed_node_count = self.completed_node_count.saturating_add(1);
476 }
477
478 fn record_error(&mut self, err: PureflowError) {
479 self.error_count = self.error_count.saturating_add(1);
480
481 if matches!(err, PureflowError::Cancellation(_)) {
482 self.cancelled_node_count = self.cancelled_node_count.saturating_add(1);
483 } else {
484 self.failed_node_count = self.failed_node_count.saturating_add(1);
485 }
486
487 if self.first_error.is_none() {
488 self.terminal_state = if matches!(err, PureflowError::Cancellation(_)) {
489 WorkflowTerminalState::Cancelled
490 } else {
491 WorkflowTerminalState::Failed
492 };
493 self.first_error = Some(err);
494 }
495 }
496
497 fn record_workflow_error(&mut self, err: PureflowError) {
498 self.error_count = self.error_count.saturating_add(1);
499
500 if self.first_error.is_none() {
501 self.terminal_state = if matches!(err, PureflowError::Cancellation(_)) {
502 WorkflowTerminalState::Cancelled
503 } else {
504 WorkflowTerminalState::Failed
505 };
506 self.first_error = Some(err);
507 }
508 }
509
510 fn record_deadlock_diagnostic(&mut self, diagnostic: WorkflowDeadlockDiagnostic) {
511 self.deadlock_diagnostic = Some(diagnostic);
512 }
513}
514
515pub trait NodeExecutorRegistry: Sync {
520 type Executor: NodeExecutor + ?Sized;
522
523 fn executor_for(&self, node_id: &NodeId) -> Result<&Self::Executor>;
529}
530
531#[derive(Debug, Clone, Copy)]
533pub struct SingleNodeExecutorRegistry<'a, E: ?Sized> {
534 executor: &'a E,
535}
536
537impl<'a, E: ?Sized> SingleNodeExecutorRegistry<'a, E> {
538 #[must_use]
540 pub const fn new(executor: &'a E) -> Self {
541 Self { executor }
542 }
543}
544
545impl<E> NodeExecutorRegistry for SingleNodeExecutorRegistry<'_, E>
546where
547 E: NodeExecutor + ?Sized,
548{
549 type Executor = E;
550
551 fn executor_for(&self, _node_id: &NodeId) -> Result<&Self::Executor> {
552 Ok(self.executor)
553 }
554}
555
556#[derive(Debug, Clone)]
558pub struct StaticNodeExecutorRegistry<E> {
559 executors: BTreeMap<NodeId, E>,
560}
561
562impl<E> StaticNodeExecutorRegistry<E> {
563 #[must_use]
565 pub const fn new(executors: BTreeMap<NodeId, E>) -> Self {
566 Self { executors }
567 }
568
569 #[must_use]
571 pub const fn executors(&self) -> &BTreeMap<NodeId, E> {
572 &self.executors
573 }
574
575 pub fn insert(&mut self, node_id: NodeId, executor: E) -> Option<E> {
577 self.executors.insert(node_id, executor)
578 }
579}
580
581impl<E> NodeExecutorRegistry for StaticNodeExecutorRegistry<E>
582where
583 E: NodeExecutor,
584{
585 type Executor = E;
586
587 fn executor_for(&self, node_id: &NodeId) -> Result<&Self::Executor> {
588 self.executors.get(node_id).ok_or_else(|| {
589 PureflowError::execution(format!(
590 "no executor registered for workflow node `{node_id}`"
591 ))
592 })
593 }
594}
595
596#[derive(Debug)]
603pub struct BatchNodeExecutor<E> {
604 executor: E,
605}
606
607impl<E> BatchNodeExecutor<E> {
608 #[must_use]
610 pub const fn new(executor: E) -> Self {
611 Self { executor }
612 }
613
614 #[must_use]
616 pub const fn executor(&self) -> &E {
617 &self.executor
618 }
619}
620
621impl<E> NodeExecutor for BatchNodeExecutor<E>
622where
623 E: BatchExecutor,
624{
625 type RunFuture<'a>
626 = BoxFuture<'a, Result<()>>
627 where
628 Self: 'a;
629
630 fn run(&self, ctx: NodeContext, inputs: PortsIn, outputs: PortsOut) -> Self::RunFuture<'_> {
631 Box::pin(run_batch_node_executor(
632 &self.executor,
633 ctx,
634 inputs,
635 outputs,
636 ))
637 }
638}
639
640async fn run_batch_node_executor<E>(
641 executor: &E,
642 ctx: NodeContext,
643 mut inputs: PortsIn,
644 outputs: PortsOut,
645) -> Result<()>
646where
647 E: BatchExecutor,
648{
649 let cancellation: CancellationToken = ctx.cancellation_token();
650 let mut batch_inputs: BatchInputs = BatchInputs::new();
651 while let Some((port_id, packet)) = inputs.recv_any(&cancellation).await? {
652 batch_inputs.push(port_id, packet);
653 }
654
655 let batch_outputs: BatchOutputs = executor.invoke(batch_inputs)?;
656 send_batch_outputs(&outputs, batch_outputs, &cancellation).await
657}
658
659async fn send_batch_outputs(
660 outputs: &PortsOut,
661 batch_outputs: BatchOutputs,
662 cancellation: &CancellationToken,
663) -> Result<()> {
664 for (port_id, packets) in batch_outputs.into_packets_by_port() {
665 send_batch_output_port(outputs, &port_id, packets, cancellation).await?;
666 }
667 Ok(())
668}
669
670async fn send_batch_output_port(
671 outputs: &PortsOut,
672 port_id: &PortId,
673 packets: Vec<PortPacket>,
674 cancellation: &CancellationToken,
675) -> Result<()> {
676 for packet in packets {
677 outputs.send(port_id, packet, cancellation).await?;
678 }
679 Ok(())
680}
681
682#[derive(Debug, Clone)]
684pub struct WorkflowOutputContracts {
685 outputs_by_node: BTreeMap<NodeId, BTreeMap<PortId, Option<SchemaRef>>>,
686}
687
688impl WorkflowOutputContracts {
689 pub fn from_node_contracts(
696 workflow: &WorkflowDefinition,
697 contracts: &[NodeContract],
698 ) -> Result<Self> {
699 let contract_map: BTreeMap<&NodeId, &NodeContract> = contracts
700 .iter()
701 .map(|contract: &NodeContract| (contract.id(), contract))
702 .collect();
703 let mut outputs_by_node: BTreeMap<NodeId, BTreeMap<PortId, Option<SchemaRef>>> =
704 BTreeMap::new();
705
706 for node in workflow.nodes() {
707 let Some(contract): Option<&NodeContract> = contract_map.get(node.id()).copied() else {
708 if node.output_ports().is_empty() {
709 outputs_by_node.insert(node.id().clone(), BTreeMap::new());
710 continue;
711 }
712
713 return Err(PureflowError::execution(format!(
714 "no output contract supplied for workflow node `{}`",
715 node.id()
716 )));
717 };
718 let mut output_contracts: BTreeMap<PortId, Option<SchemaRef>> = BTreeMap::new();
719
720 for port_id in node.output_ports() {
721 let port_contract: &PortContract = contract
722 .ports()
723 .iter()
724 .find(|port: &&PortContract| port.port_id() == port_id)
725 .ok_or_else(|| {
726 PureflowError::execution(format!(
727 "node `{}` output port `{port_id}` has no output contract",
728 node.id()
729 ))
730 })?;
731 if port_contract.direction() != PortDirection::Output {
732 return Err(PureflowError::execution(format!(
733 "node `{}` port `{port_id}` contract is not an output contract",
734 node.id()
735 )));
736 }
737 output_contracts.insert(port_id.clone(), port_contract.schema().cloned());
738 }
739
740 for port_contract in contract.ports() {
741 if port_contract.direction() == PortDirection::Output
742 && !node.output_ports().contains(port_contract.port_id())
743 {
744 return Err(PureflowError::execution(format!(
745 "node `{}` contract references unknown output port `{}`",
746 node.id(),
747 port_contract.port_id()
748 )));
749 }
750 }
751
752 outputs_by_node.insert(node.id().clone(), output_contracts);
753 }
754
755 for contract in contracts {
756 if workflow
757 .nodes()
758 .iter()
759 .all(|node: &NodeDefinition| node.id() != contract.id())
760 {
761 return Err(PureflowError::execution(format!(
762 "output contract references unknown workflow node `{}`",
763 contract.id()
764 )));
765 }
766 }
767
768 Ok(Self { outputs_by_node })
769 }
770
771 fn output_contracts_for(
772 &self,
773 node_id: &NodeId,
774 ) -> Option<&BTreeMap<PortId, Option<SchemaRef>>> {
775 self.outputs_by_node.get(node_id)
776 }
777}
778
779#[derive(Debug, Clone)]
780struct ContractOutputValidator {
781 workflow_id: WorkflowId,
782 node_id: NodeId,
783 execution: ExecutionMetadata,
784 output_contracts: BTreeMap<PortId, Option<SchemaRef>>,
785}
786
787impl ContractOutputValidator {
788 const fn new(
789 workflow_id: WorkflowId,
790 node_id: NodeId,
791 execution: ExecutionMetadata,
792 output_contracts: BTreeMap<PortId, Option<SchemaRef>>,
793 ) -> Self {
794 Self {
795 workflow_id,
796 node_id,
797 execution,
798 output_contracts,
799 }
800 }
801
802 fn reject(port_id: &PortId, reason: impl Into<String>) -> PortSendError {
803 PortSendError::Rejected {
804 port_id: port_id.clone(),
805 reason: reason.into(),
806 }
807 }
808}
809
810impl OutputPacketValidator for ContractOutputValidator {
811 fn validate(
812 &self,
813 port_id: &PortId,
814 packet: &PortPacket,
815 ) -> std::result::Result<(), PortSendError> {
816 if !self.output_contracts.contains_key(port_id) {
817 return Err(Self::reject(
818 port_id,
819 format!(
820 "node `{}` output port `{port_id}` has no output contract",
821 self.node_id
822 ),
823 ));
824 }
825
826 if packet.metadata().workflow_id() != &self.workflow_id {
827 return Err(Self::reject(
828 port_id,
829 format!(
830 "packet workflow `{}` does not match workflow `{}`",
831 packet.metadata().workflow_id(),
832 self.workflow_id
833 ),
834 ));
835 }
836
837 if packet.metadata().execution() != &self.execution {
838 return Err(Self::reject(
839 port_id,
840 format!(
841 "packet execution `{}` does not match execution `{}`",
842 packet.metadata().execution().execution_id(),
843 self.execution.execution_id()
844 ),
845 ));
846 }
847
848 let Some(source): Option<&MessageEndpoint> = packet.metadata().route().source() else {
849 return Err(Self::reject(port_id, "packet route has no source endpoint"));
850 };
851 if source.node_id() != &self.node_id || source.port_id() != port_id {
852 return Err(Self::reject(
853 port_id,
854 format!(
855 "packet source `{}:{}` does not match output `{}:{port_id}`",
856 source.node_id(),
857 source.port_id(),
858 self.node_id
859 ),
860 ));
861 }
862
863 Ok(())
864 }
865}
866
867pub async fn run_workflow_with_registry_summary<R>(
874 workflow: &WorkflowDefinition,
875 execution: &ExecutionMetadata,
876 registry: &R,
877) -> Result<WorkflowRunSummary>
878where
879 R: NodeExecutorRegistry + ?Sized,
880{
881 let lifecycle_hook: NoopLifecycleHook = NoopLifecycleHook;
882 run_workflow_with_registry_and_observers_summary(
883 workflow,
884 execution,
885 registry,
886 &lifecycle_hook,
887 Arc::new(NoopMetadataSink),
888 )
889 .await
890}
891
892pub async fn run_workflow_with_registry_policy_summary<R>(
899 workflow: &WorkflowDefinition,
900 execution: &ExecutionMetadata,
901 registry: &R,
902 policy: WorkflowRunPolicy,
903) -> Result<WorkflowRunSummary>
904where
905 R: NodeExecutorRegistry + ?Sized,
906{
907 let lifecycle_hook: NoopLifecycleHook = NoopLifecycleHook;
908 run_workflow_with_registry_and_observers_summary_inner(
909 workflow,
910 execution,
911 registry,
912 &lifecycle_hook,
913 Arc::new(NoopMetadataSink),
914 policy,
915 None,
916 )
917 .await
918}
919
920pub async fn run_workflow_with_registry<R>(
927 workflow: &WorkflowDefinition,
928 execution: &ExecutionMetadata,
929 registry: &R,
930) -> Result<()>
931where
932 R: NodeExecutorRegistry + ?Sized,
933{
934 run_workflow_with_registry_summary(workflow, execution, registry)
935 .await?
936 .into_result()
937}
938
939pub async fn run_workflow_with_registry_policy<R>(
946 workflow: &WorkflowDefinition,
947 execution: &ExecutionMetadata,
948 registry: &R,
949 policy: WorkflowRunPolicy,
950) -> Result<()>
951where
952 R: NodeExecutorRegistry + ?Sized,
953{
954 run_workflow_with_registry_policy_summary(workflow, execution, registry, policy)
955 .await?
956 .into_result()
957}
958
959pub async fn run_workflow_with_registry_policy_and_metadata_sink_summary<R, M>(
967 workflow: &WorkflowDefinition,
968 execution: &ExecutionMetadata,
969 registry: &R,
970 policy: WorkflowRunPolicy,
971 metadata_sink: Arc<M>,
972) -> Result<WorkflowRunSummary>
973where
974 R: NodeExecutorRegistry + ?Sized,
975 M: MetadataSink + 'static,
976{
977 let lifecycle_hook: NoopLifecycleHook = NoopLifecycleHook;
978 run_workflow_with_registry_and_observers_summary_inner(
979 workflow,
980 execution,
981 registry,
982 &lifecycle_hook,
983 metadata_sink,
984 policy,
985 None,
986 )
987 .await
988}
989
990pub async fn run_workflow_with_registry_policy_and_metadata_sink<R, M>(
998 workflow: &WorkflowDefinition,
999 execution: &ExecutionMetadata,
1000 registry: &R,
1001 policy: WorkflowRunPolicy,
1002 metadata_sink: Arc<M>,
1003) -> Result<()>
1004where
1005 R: NodeExecutorRegistry + ?Sized,
1006 M: MetadataSink + 'static,
1007{
1008 run_workflow_with_registry_policy_and_metadata_sink_summary(
1009 workflow,
1010 execution,
1011 registry,
1012 policy,
1013 metadata_sink,
1014 )
1015 .await?
1016 .into_result()
1017}
1018
1019pub async fn run_workflow_with_registry_and_metadata_sink_summary<R, M>(
1026 workflow: &WorkflowDefinition,
1027 execution: &ExecutionMetadata,
1028 registry: &R,
1029 metadata_sink: Arc<M>,
1030) -> Result<WorkflowRunSummary>
1031where
1032 R: NodeExecutorRegistry + ?Sized,
1033 M: MetadataSink + 'static,
1034{
1035 let lifecycle_hook: NoopLifecycleHook = NoopLifecycleHook;
1036 run_workflow_with_registry_and_observers_summary(
1037 workflow,
1038 execution,
1039 registry,
1040 &lifecycle_hook,
1041 metadata_sink,
1042 )
1043 .await
1044}
1045
1046pub async fn run_workflow_with_registry_and_observers_summary<R, H, M>(
1053 workflow: &WorkflowDefinition,
1054 execution: &ExecutionMetadata,
1055 registry: &R,
1056 lifecycle_hook: &H,
1057 metadata_sink: Arc<M>,
1058) -> Result<WorkflowRunSummary>
1059where
1060 R: NodeExecutorRegistry + ?Sized,
1061 H: LifecycleHook + ?Sized,
1062 M: MetadataSink + 'static,
1063{
1064 run_workflow_with_registry_and_observers_summary_inner(
1065 workflow,
1066 execution,
1067 registry,
1068 lifecycle_hook,
1069 metadata_sink,
1070 WorkflowRunPolicy::default(),
1071 None,
1072 )
1073 .await
1074}
1075
1076async fn run_workflow_with_registry_and_observers_summary_inner<R, H, M>(
1077 workflow: &WorkflowDefinition,
1078 execution: &ExecutionMetadata,
1079 registry: &R,
1080 lifecycle_hook: &H,
1081 metadata_sink: Arc<M>,
1082 policy: WorkflowRunPolicy,
1083 output_contracts: Option<&WorkflowOutputContracts>,
1084) -> Result<WorkflowRunSummary>
1085where
1086 R: NodeExecutorRegistry + ?Sized,
1087 H: LifecycleHook + ?Sized,
1088 M: MetadataSink + 'static,
1089{
1090 validate_workflow_run_policy(workflow, policy)?;
1091
1092 let (mut inputs_by_node, mut outputs_by_node): PortWiring = build_port_wiring(workflow);
1093 let cancellation: CancellationHandle = CancellationHandle::new();
1094
1095 let node_runs: FuturesUnordered<_> = FuturesUnordered::new();
1096 for node in workflow.nodes() {
1097 let executor: &R::Executor = registry.executor_for(node.id())?;
1098 let node_id: NodeId = node.id().clone();
1099 let ctx: NodeContext =
1100 NodeContext::new(workflow.id().clone(), node_id.clone(), execution.clone())
1101 .with_cancellation_token(cancellation.token());
1102 let inputs: PortsIn = PortsIn::from_handles(
1103 node.input_ports().to_vec(),
1104 inputs_by_node.remove(node.id()).unwrap_or_default(),
1105 );
1106 let mut outputs: PortsOut = PortsOut::from_handles(
1107 node.output_ports().to_vec(),
1108 outputs_by_node.remove(node.id()).unwrap_or_default(),
1109 );
1110 if let Some(output_contracts) = output_contracts {
1111 let node_output_contracts: BTreeMap<PortId, Option<SchemaRef>> = output_contracts
1112 .output_contracts_for(node.id())
1113 .cloned()
1114 .ok_or_else(|| {
1115 PureflowError::execution(format!(
1116 "no output contracts supplied for workflow node `{}`",
1117 node.id()
1118 ))
1119 })?;
1120 outputs = outputs.with_output_validator(Arc::new(ContractOutputValidator::new(
1121 workflow.id().clone(),
1122 node_id.clone(),
1123 execution.clone(),
1124 node_output_contracts,
1125 )));
1126 }
1127 let metadata_sink: Arc<M> = metadata_sink.clone();
1128 node_runs.push(async move {
1129 let result: Result<()> = run_node_with_observers(
1130 executor,
1131 ctx,
1132 inputs,
1133 outputs,
1134 lifecycle_hook,
1135 metadata_sink,
1136 )
1137 .await;
1138 (node_id, result)
1139 });
1140 }
1141
1142 collect_workflow_summary(
1143 node_runs,
1144 WorkflowCollectionContext::new(
1145 &cancellation,
1146 workflow,
1147 execution,
1148 metadata_sink.as_ref(),
1149 policy,
1150 ),
1151 workflow.nodes().len(),
1152 )
1153 .await
1154}
1155
1156pub async fn run_workflow_with_registry_contracts_and_observers_summary<R, H, M>(
1163 workflow: &WorkflowDefinition,
1164 execution: &ExecutionMetadata,
1165 registry: &R,
1166 contracts: &[NodeContract],
1167 lifecycle_hook: &H,
1168 metadata_sink: Arc<M>,
1169) -> Result<WorkflowRunSummary>
1170where
1171 R: NodeExecutorRegistry + ?Sized,
1172 H: LifecycleHook + ?Sized,
1173 M: MetadataSink + 'static,
1174{
1175 let output_contracts: WorkflowOutputContracts =
1176 WorkflowOutputContracts::from_node_contracts(workflow, contracts)?;
1177 run_workflow_with_registry_and_observers_summary_inner(
1178 workflow,
1179 execution,
1180 registry,
1181 lifecycle_hook,
1182 metadata_sink,
1183 WorkflowRunPolicy::default(),
1184 Some(&output_contracts),
1185 )
1186 .await
1187}
1188
1189pub async fn run_workflow_with_registry_contracts_and_observers<R, H, M>(
1197 workflow: &WorkflowDefinition,
1198 execution: &ExecutionMetadata,
1199 registry: &R,
1200 contracts: &[NodeContract],
1201 lifecycle_hook: &H,
1202 metadata_sink: Arc<M>,
1203) -> Result<()>
1204where
1205 R: NodeExecutorRegistry + ?Sized,
1206 H: LifecycleHook + ?Sized,
1207 M: MetadataSink + 'static,
1208{
1209 run_workflow_with_registry_contracts_and_observers_summary(
1210 workflow,
1211 execution,
1212 registry,
1213 contracts,
1214 lifecycle_hook,
1215 metadata_sink,
1216 )
1217 .await?
1218 .into_result()
1219}
1220
1221pub async fn run_workflow_with_registry_contracts_summary<R>(
1228 workflow: &WorkflowDefinition,
1229 execution: &ExecutionMetadata,
1230 registry: &R,
1231 contracts: &[NodeContract],
1232) -> Result<WorkflowRunSummary>
1233where
1234 R: NodeExecutorRegistry + ?Sized,
1235{
1236 let lifecycle_hook: NoopLifecycleHook = NoopLifecycleHook;
1237 run_workflow_with_registry_contracts_and_observers_summary(
1238 workflow,
1239 execution,
1240 registry,
1241 contracts,
1242 &lifecycle_hook,
1243 Arc::new(NoopMetadataSink),
1244 )
1245 .await
1246}
1247
1248pub async fn run_workflow_with_registry_contracts<R>(
1256 workflow: &WorkflowDefinition,
1257 execution: &ExecutionMetadata,
1258 registry: &R,
1259 contracts: &[NodeContract],
1260) -> Result<()>
1261where
1262 R: NodeExecutorRegistry + ?Sized,
1263{
1264 run_workflow_with_registry_contracts_summary(workflow, execution, registry, contracts)
1265 .await?
1266 .into_result()
1267}
1268
1269pub async fn run_workflow_with_registry_and_observers<R, H, M>(
1277 workflow: &WorkflowDefinition,
1278 execution: &ExecutionMetadata,
1279 registry: &R,
1280 lifecycle_hook: &H,
1281 metadata_sink: Arc<M>,
1282) -> Result<()>
1283where
1284 R: NodeExecutorRegistry + ?Sized,
1285 H: LifecycleHook + ?Sized,
1286 M: MetadataSink + 'static,
1287{
1288 run_workflow_with_registry_and_observers_summary(
1289 workflow,
1290 execution,
1291 registry,
1292 lifecycle_hook,
1293 metadata_sink,
1294 )
1295 .await?
1296 .into_result()
1297}
1298
1299pub async fn run_workflow_with_registry_and_metadata_sink<R, M>(
1307 workflow: &WorkflowDefinition,
1308 execution: &ExecutionMetadata,
1309 registry: &R,
1310 metadata_sink: Arc<M>,
1311) -> Result<()>
1312where
1313 R: NodeExecutorRegistry + ?Sized,
1314 M: MetadataSink + 'static,
1315{
1316 run_workflow_with_registry_and_metadata_sink_summary(
1317 workflow,
1318 execution,
1319 registry,
1320 metadata_sink,
1321 )
1322 .await?
1323 .into_result()
1324}
1325
1326pub async fn run_workflow<E: NodeExecutor + ?Sized>(
1333 workflow: &WorkflowDefinition,
1334 execution: &ExecutionMetadata,
1335 executor: &E,
1336) -> Result<()> {
1337 let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1338 run_workflow_with_registry(workflow, execution, ®istry).await
1339}
1340
1341pub async fn run_workflow_summary<E: NodeExecutor + ?Sized>(
1348 workflow: &WorkflowDefinition,
1349 execution: &ExecutionMetadata,
1350 executor: &E,
1351) -> Result<WorkflowRunSummary> {
1352 let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1353 run_workflow_with_registry_summary(workflow, execution, ®istry).await
1354}
1355
1356pub async fn run_workflow_with_policy_summary<E: NodeExecutor + ?Sized>(
1363 workflow: &WorkflowDefinition,
1364 execution: &ExecutionMetadata,
1365 executor: &E,
1366 policy: WorkflowRunPolicy,
1367) -> Result<WorkflowRunSummary> {
1368 let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1369 run_workflow_with_registry_policy_summary(workflow, execution, ®istry, policy).await
1370}
1371
1372pub async fn run_workflow_with_policy<E: NodeExecutor + ?Sized>(
1379 workflow: &WorkflowDefinition,
1380 execution: &ExecutionMetadata,
1381 executor: &E,
1382 policy: WorkflowRunPolicy,
1383) -> Result<()> {
1384 let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1385 run_workflow_with_registry_policy(workflow, execution, ®istry, policy).await
1386}
1387
1388pub async fn run_workflow_with_observers_summary<E, H, M>(
1395 workflow: &WorkflowDefinition,
1396 execution: &ExecutionMetadata,
1397 executor: &E,
1398 lifecycle_hook: &H,
1399 metadata_sink: Arc<M>,
1400) -> Result<WorkflowRunSummary>
1401where
1402 E: NodeExecutor + ?Sized,
1403 H: LifecycleHook + ?Sized,
1404 M: MetadataSink + 'static,
1405{
1406 let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1407 run_workflow_with_registry_and_observers_summary(
1408 workflow,
1409 execution,
1410 ®istry,
1411 lifecycle_hook,
1412 metadata_sink,
1413 )
1414 .await
1415}
1416
1417pub async fn run_workflow_with_observers<E, H, M>(
1424 workflow: &WorkflowDefinition,
1425 execution: &ExecutionMetadata,
1426 executor: &E,
1427 lifecycle_hook: &H,
1428 metadata_sink: Arc<M>,
1429) -> Result<()>
1430where
1431 E: NodeExecutor + ?Sized,
1432 H: LifecycleHook + ?Sized,
1433 M: MetadataSink + 'static,
1434{
1435 let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1436 run_workflow_with_registry_and_observers(
1437 workflow,
1438 execution,
1439 ®istry,
1440 lifecycle_hook,
1441 metadata_sink,
1442 )
1443 .await
1444}
1445
1446pub async fn run_workflow_with_contracts_summary<E: NodeExecutor + ?Sized>(
1453 workflow: &WorkflowDefinition,
1454 execution: &ExecutionMetadata,
1455 executor: &E,
1456 contracts: &[NodeContract],
1457) -> Result<WorkflowRunSummary> {
1458 let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1459 run_workflow_with_registry_contracts_summary(workflow, execution, ®istry, contracts).await
1460}
1461
1462pub async fn run_workflow_with_contracts<E: NodeExecutor + ?Sized>(
1470 workflow: &WorkflowDefinition,
1471 execution: &ExecutionMetadata,
1472 executor: &E,
1473 contracts: &[NodeContract],
1474) -> Result<()> {
1475 let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1476 run_workflow_with_registry_contracts(workflow, execution, ®istry, contracts).await
1477}
1478
1479pub async fn run_workflow_with_metadata_sink<E, M>(
1486 workflow: &WorkflowDefinition,
1487 execution: &ExecutionMetadata,
1488 executor: &E,
1489 metadata_sink: Arc<M>,
1490) -> Result<()>
1491where
1492 E: NodeExecutor + ?Sized,
1493 M: MetadataSink + 'static,
1494{
1495 let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1496 run_workflow_with_registry_and_metadata_sink(workflow, execution, ®istry, metadata_sink)
1497 .await
1498}
1499
1500pub async fn run_workflow_with_metadata_sink_summary<E, M>(
1507 workflow: &WorkflowDefinition,
1508 execution: &ExecutionMetadata,
1509 executor: &E,
1510 metadata_sink: Arc<M>,
1511) -> Result<WorkflowRunSummary>
1512where
1513 E: NodeExecutor + ?Sized,
1514 M: MetadataSink + 'static,
1515{
1516 let registry: SingleNodeExecutorRegistry<'_, E> = SingleNodeExecutorRegistry::new(executor);
1517 run_workflow_with_registry_and_metadata_sink_summary(
1518 workflow,
1519 execution,
1520 ®istry,
1521 metadata_sink,
1522 )
1523 .await
1524}
1525
1526#[derive(Clone, Copy)]
1527struct WorkflowCollectionContext<'a> {
1528 cancellation: &'a CancellationHandle,
1529 workflow: &'a WorkflowDefinition,
1530 execution: &'a ExecutionMetadata,
1531 metadata_sink: &'a dyn MetadataSink,
1532 policy: WorkflowRunPolicy,
1533}
1534
1535impl<'a> WorkflowCollectionContext<'a> {
1536 const fn new(
1537 cancellation: &'a CancellationHandle,
1538 workflow: &'a WorkflowDefinition,
1539 execution: &'a ExecutionMetadata,
1540 metadata_sink: &'a dyn MetadataSink,
1541 policy: WorkflowRunPolicy,
1542 ) -> Self {
1543 Self {
1544 cancellation,
1545 workflow,
1546 execution,
1547 metadata_sink,
1548 policy,
1549 }
1550 }
1551}
1552
1553async fn collect_workflow_summary<F>(
1554 mut node_runs: FuturesUnordered<F>,
1555 context: WorkflowCollectionContext<'_>,
1556 scheduled_node_count: usize,
1557) -> Result<WorkflowRunSummary>
1558where
1559 F: Future<Output = (NodeId, Result<()>)>,
1560{
1561 let mut summary: WorkflowRunSummary = WorkflowRunSummary::new(scheduled_node_count);
1562
1563 match context.policy.watchdog_policy() {
1564 WorkflowWatchdogPolicy::Disabled => {
1565 collect_workflow_summary_until_complete(&mut node_runs, context, &mut summary).await?;
1566 }
1567 WorkflowWatchdogPolicy::Deadlock(watchdog) => {
1568 collect_workflow_summary_with_deadlock_watchdog(
1569 &mut node_runs,
1570 context,
1571 watchdog,
1572 &mut summary,
1573 )
1574 .await?;
1575 }
1576 }
1577
1578 Ok(summary)
1579}
1580
1581async fn collect_workflow_summary_until_complete<F>(
1582 node_runs: &mut FuturesUnordered<F>,
1583 context: WorkflowCollectionContext<'_>,
1584 summary: &mut WorkflowRunSummary,
1585) -> Result<()>
1586where
1587 F: Future<Output = (NodeId, Result<()>)>,
1588{
1589 while let Some((_node_id, result)) = node_runs.next().await {
1590 record_node_run_result(context, summary, result)?;
1591 }
1592
1593 Ok(())
1594}
1595
1596async fn collect_workflow_summary_with_deadlock_watchdog<F>(
1597 node_runs: &mut FuturesUnordered<F>,
1598 context: WorkflowCollectionContext<'_>,
1599 watchdog: DeadlockWatchdogPolicy,
1600 summary: &mut WorkflowRunSummary,
1601) -> Result<()>
1602where
1603 F: Future<Output = (NodeId, Result<()>)>,
1604{
1605 loop {
1606 if node_runs.is_empty() {
1607 return Ok(());
1608 }
1609
1610 let next_node_result: Next<'_, FuturesUnordered<F>> = node_runs.next();
1611 let watchdog_deadline: BoxFuture<'static, ()> =
1612 deadlock_watchdog_deadline(watchdog.no_progress_timeout())?;
1613 futures::pin_mut!(next_node_result);
1614 futures::pin_mut!(watchdog_deadline);
1615
1616 match select(next_node_result, watchdog_deadline).await {
1617 Either::Left((Some((_node_id, result)), _deadline)) => {
1618 record_node_run_result(context, summary, result)?;
1619 }
1620 Either::Left((None, _deadline)) => return Ok(()),
1621 Either::Right(((), _next_node_result)) => {
1622 let diagnostic: WorkflowDeadlockDiagnostic = WorkflowDeadlockDiagnostic::from_run(
1623 context.workflow,
1624 summary,
1625 context.policy,
1626 watchdog,
1627 );
1628 let err: PureflowError = PureflowError::execution(diagnostic.to_string());
1629 record_first_workflow_error_with_diagnostic(context, &err, &diagnostic)?;
1630 summary.record_workflow_error(err);
1631 summary.record_deadlock_diagnostic(diagnostic);
1632 return Ok(());
1633 }
1634 }
1635 }
1636}
1637
1638fn record_node_run_result(
1639 context: WorkflowCollectionContext<'_>,
1640 summary: &mut WorkflowRunSummary,
1641 result: Result<()>,
1642) -> Result<()> {
1643 match result {
1644 Ok(()) => summary.record_success(),
1645 Err(err) => {
1646 if summary.first_error().is_none() {
1647 record_first_workflow_error(context, &err)?;
1648 }
1649 summary.record_error(err);
1650 }
1651 }
1652
1653 Ok(())
1654}
1655
1656fn record_first_workflow_error(
1657 context: WorkflowCollectionContext<'_>,
1658 err: &PureflowError,
1659) -> Result<()> {
1660 let record: MetadataRecord = MetadataRecord::Error(ErrorMetadataRecord::workflow_failed(
1661 context.workflow.id().clone(),
1662 context.execution.clone(),
1663 err.clone(),
1664 ));
1665 context.metadata_sink.record(&record)?;
1666 let _first_request: bool = context
1667 .cancellation
1668 .cancel(CancellationRequest::new(format!(
1669 "node execution failed: {err}"
1670 )));
1671 Ok(())
1672}
1673
1674fn record_first_workflow_error_with_diagnostic(
1675 context: WorkflowCollectionContext<'_>,
1676 err: &PureflowError,
1677 diagnostic: &WorkflowDeadlockDiagnostic,
1678) -> Result<()> {
1679 let record: MetadataRecord =
1680 MetadataRecord::Error(ErrorMetadataRecord::workflow_failed_with_diagnostic(
1681 context.workflow.id().clone(),
1682 context.execution.clone(),
1683 err.clone(),
1684 diagnostic.to_metadata_diagnostic(),
1685 ));
1686 context.metadata_sink.record(&record)?;
1687 let _first_request: bool = context
1688 .cancellation
1689 .cancel(CancellationRequest::new(format!(
1690 "node execution failed: {err}"
1691 )));
1692 Ok(())
1693}
1694
1695fn deadlock_watchdog_deadline(timeout: Duration) -> Result<BoxFuture<'static, ()>> {
1696 let (sender, receiver): (oneshot::Sender<()>, oneshot::Receiver<()>) = oneshot::channel();
1697 std::thread::Builder::new()
1698 .name(String::from("pureflow-deadlock-watchdog"))
1699 .spawn(move || {
1700 std::thread::sleep(timeout);
1701 let _send_result: std::result::Result<(), ()> = sender.send(());
1702 })
1703 .map_err(|err: std::io::Error| {
1704 PureflowError::execution(format!("failed to start workflow deadlock watchdog: {err}"))
1705 })?;
1706
1707 Ok(Box::pin(async move {
1708 let _deadline_result: std::result::Result<(), oneshot::Canceled> = receiver.await;
1709 }))
1710}
1711
1712fn validate_workflow_run_policy(
1713 workflow: &WorkflowDefinition,
1714 policy: WorkflowRunPolicy,
1715) -> Result<()> {
1716 match workflow.graph().topological_order() {
1717 Ok(_order) => Ok(()),
1718 Err(WorkflowValidationError::CycleDetected { cycle }) => {
1719 validate_cycle_run_policy(workflow, policy, &cycle)
1720 }
1721 Err(err) => Err(PureflowError::execution(format!(
1722 "workflow `{}` topology validation failed before execution: {err}",
1723 workflow.id()
1724 ))),
1725 }
1726}
1727
1728fn validate_cycle_run_policy(
1729 workflow: &WorkflowDefinition,
1730 policy: WorkflowRunPolicy,
1731 cycle: &[NodeId],
1732) -> Result<()> {
1733 match policy.cycle_policy() {
1734 CycleRunPolicy::Reject => Err(PureflowError::execution(format!(
1735 "workflow `{}` contains directed cycle {}; use an explicit feedback-loop run policy to execute cyclic graphs",
1736 workflow.id(),
1737 cycle_label(cycle)
1738 ))),
1739 CycleRunPolicy::AllowFeedbackLoops(feedback_loop) => {
1740 match (feedback_loop.startup(), feedback_loop.termination()) {
1741 (FeedbackLoopStartup::StartAllNodes, FeedbackLoopTermination::AllNodesComplete) => {
1742 Ok(())
1743 }
1744 }
1745 }
1746 }
1747}
1748
1749fn cycle_label(cycle: &[NodeId]) -> String {
1750 let mut label: String = String::new();
1751 for (index, node_id) in cycle.iter().enumerate() {
1752 if index > 0 {
1753 label.push_str(" -> ");
1754 }
1755 label.push_str(node_id.as_str());
1756 }
1757 label
1758}
1759
1760type PortWiring = (
1761 BTreeMap<NodeId, Vec<InputPortHandle>>,
1762 BTreeMap<NodeId, Vec<OutputPortHandle>>,
1763);
1764
1765fn build_port_wiring(workflow: &WorkflowDefinition) -> PortWiring {
1766 let mut inputs_by_node: BTreeMap<NodeId, Vec<InputPortHandle>> = BTreeMap::new();
1767 let mut outputs_by_node: BTreeMap<NodeId, Vec<OutputPortHandle>> = BTreeMap::new();
1768
1769 for edge in workflow.edges() {
1770 let capacity: NonZeroUsize = edge.capacity().resolve(DEFAULT_EDGE_CAPACITY);
1771 let (output, input): (OutputPortHandle, InputPortHandle) = bounded_edge_channel(
1772 edge.source().port_id().clone(),
1773 edge.target().port_id().clone(),
1774 capacity,
1775 );
1776 outputs_by_node
1777 .entry(edge.source().node_id().clone())
1778 .or_default()
1779 .push(output);
1780 inputs_by_node
1781 .entry(edge.target().node_id().clone())
1782 .or_default()
1783 .push(input);
1784 }
1785
1786 (inputs_by_node, outputs_by_node)
1787}
1788
1789#[cfg(test)]
1790mod tests {
1791 use super::*;
1792 use std::{
1793 collections::BTreeMap,
1794 future::{Ready, ready},
1795 sync::{Arc, Mutex},
1796 time::Duration,
1797 };
1798
1799 use pureflow_contract::{Determinism, ExecutionMode, PortContract, SchemaRef};
1800 use pureflow_core::{
1801 BatchExecutor, BatchInputs, BatchOutputs, PureflowError, ErrorCode, ErrorDiagnosticMetadata,
1802 ErrorMetadataKind, MetadataRecord, MetadataSink, PacketPayload, PortPacket, PortRecvError,
1803 PortSendError, RetryDisposition,
1804 lifecycle::{LifecycleEvent, LifecycleEventKind},
1805 message::{MessageEndpoint, MessageMetadata, MessageRoute},
1806 };
1807 use pureflow_test_kit::{
1808 FailingExecutor, NodeBuilder, RecordingExecutor, WorkflowBuilder, execution_metadata,
1809 node_id, port_id, workflow_id,
1810 };
1811 use pureflow_types::{ExecutionId, MessageId};
1812 use pureflow_workflow::EdgeDefinition;
1813 use futures::channel::oneshot;
1814 use futures::executor::block_on;
1815 use futures::future::BoxFuture;
1816
1817 #[derive(Debug, Default)]
1818 struct ChannelExecutor {
1819 received: Mutex<Vec<Vec<u8>>>,
1820 }
1821
1822 impl ChannelExecutor {
1823 fn received_payloads(&self) -> Vec<Vec<u8>> {
1824 self.received
1825 .lock()
1826 .expect("channel executor lock should not be poisoned")
1827 .clone()
1828 }
1829 }
1830
1831 impl NodeExecutor for ChannelExecutor {
1832 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
1833
1834 fn run(
1835 &self,
1836 ctx: NodeContext,
1837 mut inputs: PortsIn,
1838 outputs: PortsOut,
1839 ) -> Self::RunFuture<'_> {
1840 Box::pin(async move {
1841 if ctx.node_id().as_str() == "source" {
1842 let cancellation = ctx.cancellation_token();
1843 outputs
1844 .send(&port_id("out"), packet(b"hello"), &cancellation)
1845 .await?;
1846 outputs
1847 .send(&port_id("out"), packet(b"world"), &cancellation)
1848 .await?;
1849 } else if ctx.node_id().as_str() == "sink" {
1850 let cancellation = ctx.cancellation_token();
1851 for _packet_index in 0..2 {
1852 let packet: PortPacket = inputs
1853 .recv(&port_id("in"), &cancellation)
1854 .await?
1855 .expect("source should have queued a packet");
1856 self.received
1857 .lock()
1858 .expect("channel executor lock should not be poisoned")
1859 .push(
1860 packet
1861 .into_payload()
1862 .as_bytes()
1863 .expect("channel test sends bytes")
1864 .to_vec(),
1865 );
1866 }
1867 }
1868
1869 Ok(())
1870 })
1871 }
1872 }
1873
1874 #[derive(Debug, Clone, Copy)]
1875 enum RegistryExecutorRole {
1876 Source,
1877 Sink,
1878 }
1879
1880 #[derive(Debug, Clone)]
1881 struct RegistryExecutor {
1882 role: RegistryExecutorRole,
1883 received: Arc<Mutex<Vec<Vec<u8>>>>,
1884 }
1885
1886 impl RegistryExecutor {
1887 fn source(received: Arc<Mutex<Vec<Vec<u8>>>>) -> Self {
1888 Self {
1889 role: RegistryExecutorRole::Source,
1890 received,
1891 }
1892 }
1893
1894 fn sink(received: Arc<Mutex<Vec<Vec<u8>>>>) -> Self {
1895 Self {
1896 role: RegistryExecutorRole::Sink,
1897 received,
1898 }
1899 }
1900 }
1901
1902 impl NodeExecutor for RegistryExecutor {
1903 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
1904
1905 fn run(
1906 &self,
1907 ctx: NodeContext,
1908 mut inputs: PortsIn,
1909 outputs: PortsOut,
1910 ) -> Self::RunFuture<'_> {
1911 Box::pin(async move {
1912 let cancellation = ctx.cancellation_token();
1913 match self.role {
1914 RegistryExecutorRole::Source => {
1915 outputs
1916 .send(&port_id("out"), packet(b"registered"), &cancellation)
1917 .await?;
1918 }
1919 RegistryExecutorRole::Sink => {
1920 let packet: PortPacket = inputs
1921 .recv(&port_id("in"), &cancellation)
1922 .await?
1923 .expect("registered source should send one packet");
1924 self.received
1925 .lock()
1926 .expect("registry executor lock should not be poisoned")
1927 .push(packet_payload_bytes(packet));
1928 }
1929 }
1930
1931 Ok(())
1932 })
1933 }
1934 }
1935
1936 #[derive(Debug, Default)]
1937 struct BoundedBackpressureExecutor {
1938 events: Mutex<Vec<String>>,
1939 received: Mutex<Vec<Vec<u8>>>,
1940 }
1941
1942 impl BoundedBackpressureExecutor {
1943 fn events(&self) -> Vec<String> {
1944 self.events
1945 .lock()
1946 .expect("backpressure executor events lock should not be poisoned")
1947 .clone()
1948 }
1949
1950 fn received_payloads(&self) -> Vec<Vec<u8>> {
1951 self.received
1952 .lock()
1953 .expect("backpressure executor received lock should not be poisoned")
1954 .clone()
1955 }
1956
1957 fn push_event(&self, event: &str) {
1958 self.events
1959 .lock()
1960 .expect("backpressure executor events lock should not be poisoned")
1961 .push(event.to_owned());
1962 }
1963
1964 fn push_received(&self, packet: PortPacket) {
1965 self.received
1966 .lock()
1967 .expect("backpressure executor received lock should not be poisoned")
1968 .push(packet_payload_bytes(packet));
1969 }
1970 }
1971
1972 impl NodeExecutor for BoundedBackpressureExecutor {
1973 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
1974
1975 fn run(
1976 &self,
1977 ctx: NodeContext,
1978 mut inputs: PortsIn,
1979 outputs: PortsOut,
1980 ) -> Self::RunFuture<'_> {
1981 Box::pin(async move {
1982 let cancellation = ctx.cancellation_token();
1983
1984 match ctx.node_id().as_str() {
1985 "source" => {
1986 outputs
1987 .send(
1988 &port_id("out"),
1989 packet_between(b"first", "source", "sink"),
1990 &cancellation,
1991 )
1992 .await?;
1993
1994 let full_send: std::result::Result<(), PortSendError> = outputs.try_send(
1995 &port_id("out"),
1996 packet_between(b"blocked", "source", "sink"),
1997 );
1998 if matches!(full_send, Err(PortSendError::Full { .. })) {
1999 self.push_event("source-observed-full-edge");
2000 } else {
2001 return Err(PureflowError::execution(
2002 "bounded edge should reject a second immediate send",
2003 ));
2004 }
2005
2006 outputs
2007 .send(
2008 &port_id("out"),
2009 packet_between(b"second", "source", "sink"),
2010 &cancellation,
2011 )
2012 .await?;
2013 self.push_event("source-second-send-completed");
2014 }
2015 "sink" => {
2016 for _packet_index in 0..2 {
2017 let packet: PortPacket = inputs
2018 .recv(&port_id("in"), &cancellation)
2019 .await?
2020 .expect("source should send two packets");
2021 self.push_received(packet);
2022 }
2023 }
2024 _ => {}
2025 }
2026
2027 Ok(())
2028 })
2029 }
2030 }
2031
2032 #[derive(Debug, Default)]
2033 struct FanOutExecutor {
2034 received_by_node: Mutex<BTreeMap<String, Vec<Vec<u8>>>>,
2035 }
2036
2037 impl FanOutExecutor {
2038 fn received_by_node(&self) -> BTreeMap<String, Vec<Vec<u8>>> {
2039 self.received_by_node
2040 .lock()
2041 .expect("fan-out executor lock should not be poisoned")
2042 .clone()
2043 }
2044
2045 fn push_received(&self, node_id: &str, packet: PortPacket) {
2046 self.received_by_node
2047 .lock()
2048 .expect("fan-out executor lock should not be poisoned")
2049 .entry(node_id.to_owned())
2050 .or_default()
2051 .push(packet_payload_bytes(packet));
2052 }
2053 }
2054
2055 impl NodeExecutor for FanOutExecutor {
2056 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2057
2058 fn run(
2059 &self,
2060 ctx: NodeContext,
2061 mut inputs: PortsIn,
2062 outputs: PortsOut,
2063 ) -> Self::RunFuture<'_> {
2064 Box::pin(async move {
2065 let cancellation = ctx.cancellation_token();
2066
2067 if ctx.node_id().as_str() == "source" {
2068 outputs
2069 .send(
2070 &port_id("out"),
2071 packet_between(b"fan", "source", "left"),
2072 &cancellation,
2073 )
2074 .await?;
2075 return Ok(());
2076 }
2077
2078 let node_name: String = ctx.node_id().to_string();
2079 let packet: PortPacket = inputs
2080 .recv(&port_id("in"), &cancellation)
2081 .await?
2082 .expect("fan-out sink should receive one packet");
2083 self.push_received(&node_name, packet);
2084
2085 Ok(())
2086 })
2087 }
2088 }
2089
2090 #[derive(Debug, Default)]
2091 struct FanInClosureExecutor {
2092 received: Mutex<Vec<Vec<u8>>>,
2093 closure_observed: Mutex<bool>,
2094 }
2095
2096 impl FanInClosureExecutor {
2097 fn received_payloads(&self) -> Vec<Vec<u8>> {
2098 self.received
2099 .lock()
2100 .expect("fan-in executor received lock should not be poisoned")
2101 .clone()
2102 }
2103
2104 fn closure_observed(&self) -> bool {
2105 *self
2106 .closure_observed
2107 .lock()
2108 .expect("fan-in executor closure lock should not be poisoned")
2109 }
2110
2111 fn push_received(&self, packet: PortPacket) {
2112 self.received
2113 .lock()
2114 .expect("fan-in executor received lock should not be poisoned")
2115 .push(packet_payload_bytes(packet));
2116 }
2117 }
2118
2119 impl NodeExecutor for FanInClosureExecutor {
2120 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2121
2122 fn run(
2123 &self,
2124 ctx: NodeContext,
2125 mut inputs: PortsIn,
2126 outputs: PortsOut,
2127 ) -> Self::RunFuture<'_> {
2128 Box::pin(async move {
2129 let cancellation = ctx.cancellation_token();
2130
2131 match ctx.node_id().as_str() {
2132 "left" | "right" => {
2133 let source_node: String = ctx.node_id().to_string();
2134 outputs
2135 .send(
2136 &port_id("out"),
2137 packet_between(source_node.as_bytes(), &source_node, "collector"),
2138 &cancellation,
2139 )
2140 .await?;
2141 }
2142 "collector" => {
2143 for _packet_index in 0..2 {
2144 let packet: PortPacket = inputs
2145 .recv(&port_id("in"), &cancellation)
2146 .await?
2147 .expect("fan-in collector should receive both packets");
2148 self.push_received(packet);
2149 }
2150
2151 let closed: std::result::Result<Option<PortPacket>, PortRecvError> =
2152 inputs.recv(&port_id("in"), &cancellation).await;
2153 if matches!(closed, Err(PortRecvError::Disconnected { .. })) {
2154 *self
2155 .closure_observed
2156 .lock()
2157 .expect("fan-in executor closure lock should not be poisoned") =
2158 true;
2159 } else {
2160 return Err(PureflowError::execution(
2161 "fan-in input should close after upstream senders finish",
2162 ));
2163 }
2164 }
2165 _ => {}
2166 }
2167
2168 Ok(())
2169 })
2170 }
2171 }
2172
2173 #[derive(Debug, Default)]
2174 struct AggregateFailureExecutor {
2175 visited: Mutex<Vec<String>>,
2176 }
2177
2178 impl AggregateFailureExecutor {
2179 fn visited_node_names(&self) -> Vec<String> {
2180 self.visited
2181 .lock()
2182 .expect("aggregate failure executor lock should not be poisoned")
2183 .clone()
2184 }
2185 }
2186
2187 impl NodeExecutor for AggregateFailureExecutor {
2188 type RunFuture<'a> = Ready<Result<()>>;
2189
2190 fn run(
2191 &self,
2192 ctx: NodeContext,
2193 _inputs: PortsIn,
2194 _outputs: PortsOut,
2195 ) -> Self::RunFuture<'_> {
2196 self.visited
2197 .lock()
2198 .expect("aggregate failure executor lock should not be poisoned")
2199 .push(ctx.node_id().to_string());
2200
2201 if ctx.node_id().as_str() == "first" {
2202 return ready(Err(PureflowError::execution("first failed")));
2203 }
2204
2205 ready(Ok(()))
2206 }
2207 }
2208
2209 #[derive(Debug, Clone, Copy)]
2210 enum FailureMatrixRole {
2211 SourceFails,
2212 SourceForTransformFailure,
2213 TransformFails,
2214 PassthroughTransform,
2215 SinkWaits,
2216 }
2217
2218 #[derive(Debug, Clone, Copy)]
2219 struct FailureMatrixExecutor {
2220 role: FailureMatrixRole,
2221 }
2222
2223 impl FailureMatrixExecutor {
2224 const fn source_fails() -> Self {
2225 Self {
2226 role: FailureMatrixRole::SourceFails,
2227 }
2228 }
2229
2230 const fn source_for_transform_failure() -> Self {
2231 Self {
2232 role: FailureMatrixRole::SourceForTransformFailure,
2233 }
2234 }
2235
2236 const fn transform_fails() -> Self {
2237 Self {
2238 role: FailureMatrixRole::TransformFails,
2239 }
2240 }
2241
2242 const fn passthrough_transform() -> Self {
2243 Self {
2244 role: FailureMatrixRole::PassthroughTransform,
2245 }
2246 }
2247
2248 const fn sink_waits() -> Self {
2249 Self {
2250 role: FailureMatrixRole::SinkWaits,
2251 }
2252 }
2253 }
2254
2255 impl NodeExecutor for FailureMatrixExecutor {
2256 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2257
2258 fn run(
2259 &self,
2260 ctx: NodeContext,
2261 mut inputs: PortsIn,
2262 outputs: PortsOut,
2263 ) -> Self::RunFuture<'_> {
2264 Box::pin(async move {
2265 let cancellation = ctx.cancellation_token();
2266
2267 match self.role {
2268 FailureMatrixRole::SourceFails => {
2269 Err(PureflowError::execution("matrix source failed"))
2270 }
2271 FailureMatrixRole::SourceForTransformFailure => outputs
2272 .send(
2273 &port_id("out"),
2274 packet_between(b"source", "source", "transform"),
2275 &cancellation,
2276 )
2277 .await
2278 .map_err(PureflowError::from),
2279 FailureMatrixRole::TransformFails => {
2280 let _packet = inputs.recv(&port_id("in"), &cancellation).await?;
2281 Err(PureflowError::execution("matrix transform failed"))
2282 }
2283 FailureMatrixRole::PassthroughTransform => {
2284 let packet = inputs
2285 .recv(&port_id("in"), &cancellation)
2286 .await?
2287 .expect("source should send transform input");
2288 outputs
2289 .send(&port_id("out"), packet, &cancellation)
2290 .await
2291 .map_err(PureflowError::from)
2292 }
2293 FailureMatrixRole::SinkWaits => {
2294 let _packet = inputs.recv(&port_id("in"), &cancellation).await?;
2295 Ok(())
2296 }
2297 }
2298 })
2299 }
2300 }
2301
2302 #[derive(Debug)]
2303 struct DisconnectedDownstreamExecutor {
2304 role: DisconnectedDownstreamRole,
2305 signal: Arc<DisconnectedDownstreamSignal>,
2306 }
2307
2308 #[derive(Debug, Clone, Copy)]
2309 enum DisconnectedDownstreamRole {
2310 Source,
2311 Sink,
2312 }
2313
2314 #[derive(Debug)]
2315 struct DisconnectedDownstreamSignal {
2316 sender: Mutex<Option<oneshot::Sender<()>>>,
2317 receiver: Mutex<Option<oneshot::Receiver<()>>>,
2318 }
2319
2320 impl DisconnectedDownstreamExecutor {
2321 fn pair() -> (Self, Self) {
2322 let (sender, receiver) = oneshot::channel();
2323 let signal = Arc::new(DisconnectedDownstreamSignal {
2324 sender: Mutex::new(Some(sender)),
2325 receiver: Mutex::new(Some(receiver)),
2326 });
2327
2328 (
2329 Self {
2330 role: DisconnectedDownstreamRole::Source,
2331 signal: Arc::clone(&signal),
2332 },
2333 Self {
2334 role: DisconnectedDownstreamRole::Sink,
2335 signal,
2336 },
2337 )
2338 }
2339 }
2340
2341 impl NodeExecutor for DisconnectedDownstreamExecutor {
2342 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2343
2344 fn run(
2345 &self,
2346 ctx: NodeContext,
2347 _inputs: PortsIn,
2348 outputs: PortsOut,
2349 ) -> Self::RunFuture<'_> {
2350 Box::pin(async move {
2351 match self.role {
2352 DisconnectedDownstreamRole::Source => {
2353 let receiver = self
2354 .signal
2355 .receiver
2356 .lock()
2357 .expect("disconnect receiver lock should not be poisoned")
2358 .take()
2359 .expect("source should own disconnect receiver");
2360 receiver
2361 .await
2362 .expect("sink should signal before source sends");
2363 outputs
2364 .send(
2365 &port_id("out"),
2366 packet_between(b"late", "source", "sink"),
2367 &ctx.cancellation_token(),
2368 )
2369 .await
2370 .map_err(PureflowError::from)
2371 }
2372 DisconnectedDownstreamRole::Sink => {
2373 if let Some(sender) = self
2374 .signal
2375 .sender
2376 .lock()
2377 .expect("disconnect sender lock should not be poisoned")
2378 .take()
2379 {
2380 let _send_result = sender.send(());
2381 }
2382 Ok(())
2383 }
2384 }
2385 })
2386 }
2387 }
2388
2389 #[derive(Debug, Clone, Copy)]
2390 enum FanOutPartialFailureRole {
2391 Source,
2392 GoodSink,
2393 DroppingSink,
2394 }
2395
2396 #[derive(Debug)]
2397 struct FanOutPartialFailureExecutor {
2398 role: FanOutPartialFailureRole,
2399 state: Arc<FanOutPartialFailureState>,
2400 }
2401
2402 #[derive(Debug)]
2403 struct FanOutPartialFailureState {
2404 dropped_sender: Mutex<Option<oneshot::Sender<()>>>,
2405 dropped_receiver: Mutex<Option<oneshot::Receiver<()>>>,
2406 good_sink_observation: Mutex<Option<String>>,
2407 }
2408
2409 impl FanOutPartialFailureExecutor {
2410 fn registry() -> StaticNodeExecutorRegistry<Self> {
2411 let (dropped_sender, dropped_receiver) = oneshot::channel();
2412 let state = Arc::new(FanOutPartialFailureState {
2413 dropped_sender: Mutex::new(Some(dropped_sender)),
2414 dropped_receiver: Mutex::new(Some(dropped_receiver)),
2415 good_sink_observation: Mutex::new(None),
2416 });
2417
2418 StaticNodeExecutorRegistry::new(BTreeMap::from([
2419 (
2420 node_id("source"),
2421 Self {
2422 role: FanOutPartialFailureRole::Source,
2423 state: Arc::clone(&state),
2424 },
2425 ),
2426 (
2427 node_id("good"),
2428 Self {
2429 role: FanOutPartialFailureRole::GoodSink,
2430 state: Arc::clone(&state),
2431 },
2432 ),
2433 (
2434 node_id("drop"),
2435 Self {
2436 role: FanOutPartialFailureRole::DroppingSink,
2437 state,
2438 },
2439 ),
2440 ]))
2441 }
2442
2443 fn good_sink_observation(&self) -> Option<String> {
2444 self.state
2445 .good_sink_observation
2446 .lock()
2447 .expect("fan-out partial observation lock should not be poisoned")
2448 .clone()
2449 }
2450 }
2451
2452 impl NodeExecutor for FanOutPartialFailureExecutor {
2453 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2454
2455 fn run(
2456 &self,
2457 ctx: NodeContext,
2458 mut inputs: PortsIn,
2459 outputs: PortsOut,
2460 ) -> Self::RunFuture<'_> {
2461 Box::pin(async move {
2462 match self.role {
2463 FanOutPartialFailureRole::Source => {
2464 let receiver = self
2465 .state
2466 .dropped_receiver
2467 .lock()
2468 .expect("fan-out dropped receiver lock should not be poisoned")
2469 .take()
2470 .expect("source should own dropped receiver");
2471 receiver
2472 .await
2473 .expect("dropping sink should signal before source sends");
2474 outputs
2475 .send(
2476 &port_id("out"),
2477 packet_between(b"fan", "source", "good"),
2478 &ctx.cancellation_token(),
2479 )
2480 .await
2481 .map_err(PureflowError::from)
2482 }
2483 FanOutPartialFailureRole::GoodSink => {
2484 let observation =
2485 match inputs.recv(&port_id("in"), &ctx.cancellation_token()).await {
2486 Ok(Some(_packet)) => "unexpected_packet",
2487 Ok(None) => "closed_without_packet",
2488 Err(PortRecvError::Cancelled { .. }) => "cancelled_without_packet",
2489 Err(PortRecvError::Disconnected { .. }) => {
2490 "disconnected_without_packet"
2491 }
2492 Err(PortRecvError::UnknownPort { .. }) => "unknown_port",
2493 };
2494 *self
2495 .state
2496 .good_sink_observation
2497 .lock()
2498 .expect("fan-out partial observation lock should not be poisoned") =
2499 Some(observation.to_owned());
2500 if observation == "unexpected_packet" {
2501 return Err(PureflowError::execution(
2502 "fan-out partial send delivered to good sink",
2503 ));
2504 }
2505 Ok(())
2506 }
2507 FanOutPartialFailureRole::DroppingSink => {
2508 if let Some(sender) = self
2509 .state
2510 .dropped_sender
2511 .lock()
2512 .expect("fan-out dropped sender lock should not be poisoned")
2513 .take()
2514 {
2515 let _send_result = sender.send(());
2516 }
2517 Ok(())
2518 }
2519 }
2520 })
2521 }
2522 }
2523
2524 #[derive(Debug, Default)]
2525 struct CancelledExecutor;
2526
2527 impl NodeExecutor for CancelledExecutor {
2528 type RunFuture<'a> = Ready<Result<()>>;
2529
2530 fn run(
2531 &self,
2532 _ctx: NodeContext,
2533 _inputs: PortsIn,
2534 _outputs: PortsOut,
2535 ) -> Self::RunFuture<'_> {
2536 ready(Err(PureflowError::cancelled("test cancellation")))
2537 }
2538 }
2539
2540 #[derive(Debug, Default)]
2541 struct WaitingInputExecutor {
2542 visited: Mutex<Vec<String>>,
2543 }
2544
2545 impl WaitingInputExecutor {
2546 fn visited_node_names(&self) -> Vec<String> {
2547 self.visited
2548 .lock()
2549 .expect("waiting input executor lock should not be poisoned")
2550 .clone()
2551 }
2552 }
2553
2554 impl NodeExecutor for WaitingInputExecutor {
2555 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2556
2557 fn run(
2558 &self,
2559 ctx: NodeContext,
2560 mut inputs: PortsIn,
2561 outputs: PortsOut,
2562 ) -> Self::RunFuture<'_> {
2563 Box::pin(async move {
2564 let _held_outputs = outputs;
2565 self.visited
2566 .lock()
2567 .expect("waiting input executor lock should not be poisoned")
2568 .push(ctx.node_id().to_string());
2569
2570 let _packet = inputs
2571 .recv(&port_id("in"), &ctx.cancellation_token())
2572 .await?;
2573
2574 Ok(())
2575 })
2576 }
2577 }
2578
2579 #[derive(Debug, Clone, Copy)]
2580 enum FeedbackLoopExecutorRole {
2581 Driver,
2582 Counter,
2583 }
2584
2585 #[derive(Debug, Clone)]
2586 struct FeedbackLoopExecutor {
2587 role: FeedbackLoopExecutorRole,
2588 observed: Arc<Mutex<Vec<String>>>,
2589 }
2590
2591 impl FeedbackLoopExecutor {
2592 fn driver(observed: Arc<Mutex<Vec<String>>>) -> Self {
2593 Self {
2594 role: FeedbackLoopExecutorRole::Driver,
2595 observed,
2596 }
2597 }
2598
2599 fn counter(observed: Arc<Mutex<Vec<String>>>) -> Self {
2600 Self {
2601 role: FeedbackLoopExecutorRole::Counter,
2602 observed,
2603 }
2604 }
2605
2606 fn push_observed(&self, value: String) {
2607 self.observed
2608 .lock()
2609 .expect("feedback loop executor lock should not be poisoned")
2610 .push(value);
2611 }
2612 }
2613
2614 impl NodeExecutor for FeedbackLoopExecutor {
2615 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2616
2617 fn run(
2618 &self,
2619 ctx: NodeContext,
2620 mut inputs: PortsIn,
2621 outputs: PortsOut,
2622 ) -> Self::RunFuture<'_> {
2623 Box::pin(async move {
2624 let cancellation = ctx.cancellation_token();
2625 match self.role {
2626 FeedbackLoopExecutorRole::Driver => {
2627 outputs
2628 .send(
2629 &port_id("out"),
2630 packet_between(b"seed", "first", "second"),
2631 &cancellation,
2632 )
2633 .await?;
2634 let packet: PortPacket = inputs
2635 .recv(&port_id("in"), &cancellation)
2636 .await?
2637 .expect("counter should return one packet");
2638 self.push_observed(format!(
2639 "driver:{}",
2640 String::from_utf8(packet_payload_bytes(packet))
2641 .expect("feedback loop test payload should be UTF-8")
2642 ));
2643 }
2644 FeedbackLoopExecutorRole::Counter => {
2645 let packet: PortPacket = inputs
2646 .recv(&port_id("in"), &cancellation)
2647 .await?
2648 .expect("driver should seed the loop");
2649 self.push_observed(format!(
2650 "counter:{}",
2651 String::from_utf8(packet_payload_bytes(packet))
2652 .expect("feedback loop test payload should be UTF-8")
2653 ));
2654 outputs
2655 .send(
2656 &port_id("out"),
2657 packet_between(b"ack", "second", "first"),
2658 &cancellation,
2659 )
2660 .await?;
2661 }
2662 }
2663
2664 Ok(())
2665 })
2666 }
2667 }
2668
2669 #[derive(Debug, Clone, Copy)]
2670 enum FeedbackLoopShutdownRole {
2671 Failing,
2672 ShutdownWatcher,
2673 }
2674
2675 #[derive(Debug, Clone)]
2676 struct FeedbackLoopShutdownExecutor {
2677 role: FeedbackLoopShutdownRole,
2678 cancellation_observed: Arc<Mutex<bool>>,
2679 }
2680
2681 impl FeedbackLoopShutdownExecutor {
2682 fn failing(cancellation_observed: Arc<Mutex<bool>>) -> Self {
2683 Self {
2684 role: FeedbackLoopShutdownRole::Failing,
2685 cancellation_observed,
2686 }
2687 }
2688
2689 fn shutdown_watcher(cancellation_observed: Arc<Mutex<bool>>) -> Self {
2690 Self {
2691 role: FeedbackLoopShutdownRole::ShutdownWatcher,
2692 cancellation_observed,
2693 }
2694 }
2695 }
2696
2697 impl NodeExecutor for FeedbackLoopShutdownExecutor {
2698 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2699
2700 fn run(
2701 &self,
2702 ctx: NodeContext,
2703 _inputs: PortsIn,
2704 _outputs: PortsOut,
2705 ) -> Self::RunFuture<'_> {
2706 Box::pin(async move {
2707 match self.role {
2708 FeedbackLoopShutdownRole::Failing => {
2709 Err(PureflowError::execution("feedback loop shutdown requested"))
2710 }
2711 FeedbackLoopShutdownRole::ShutdownWatcher => {
2712 let cancellation = ctx.cancellation_token();
2713 std::future::poll_fn(|task_cx: &mut std::task::Context<'_>| {
2714 if cancellation.is_cancelled() {
2715 *self
2716 .cancellation_observed
2717 .lock()
2718 .expect("shutdown executor lock should not be poisoned") = true;
2719 std::task::Poll::Ready(Ok(()))
2720 } else {
2721 task_cx.waker().wake_by_ref();
2722 std::task::Poll::Pending
2723 }
2724 })
2725 .await
2726 }
2727 }
2728 })
2729 }
2730 }
2731
2732 #[derive(Debug, Clone, Copy)]
2733 enum ContractOutputMode {
2734 MatchingSource,
2735 MismatchedSource,
2736 }
2737
2738 #[derive(Debug, Clone, Copy)]
2739 enum ContractBatchOutputMode {
2740 MatchingSource,
2741 MismatchedSource,
2742 UnknownPort,
2743 }
2744
2745 #[derive(Debug, Clone, Copy)]
2746 struct ContractOutputExecutor {
2747 mode: ContractOutputMode,
2748 }
2749
2750 #[derive(Debug, Clone, Copy)]
2751 struct ContractBatchExecutor {
2752 mode: ContractBatchOutputMode,
2753 }
2754
2755 impl ContractOutputExecutor {
2756 const fn matching_source() -> Self {
2757 Self {
2758 mode: ContractOutputMode::MatchingSource,
2759 }
2760 }
2761
2762 const fn mismatched_source() -> Self {
2763 Self {
2764 mode: ContractOutputMode::MismatchedSource,
2765 }
2766 }
2767 }
2768
2769 impl ContractBatchExecutor {
2770 const fn matching_source() -> Self {
2771 Self {
2772 mode: ContractBatchOutputMode::MatchingSource,
2773 }
2774 }
2775
2776 const fn mismatched_source() -> Self {
2777 Self {
2778 mode: ContractBatchOutputMode::MismatchedSource,
2779 }
2780 }
2781
2782 const fn unknown_port() -> Self {
2783 Self {
2784 mode: ContractBatchOutputMode::UnknownPort,
2785 }
2786 }
2787 }
2788
2789 impl NodeExecutor for ContractOutputExecutor {
2790 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2791
2792 fn run(
2793 &self,
2794 ctx: NodeContext,
2795 _inputs: PortsIn,
2796 outputs: PortsOut,
2797 ) -> Self::RunFuture<'_> {
2798 Box::pin(async move {
2799 let source_node: &str = match self.mode {
2800 ContractOutputMode::MatchingSource => "source",
2801 ContractOutputMode::MismatchedSource => "other",
2802 };
2803 outputs
2804 .send(
2805 &port_id("out"),
2806 packet_between(b"contracted", source_node, "sink"),
2807 &ctx.cancellation_token(),
2808 )
2809 .await?;
2810 Ok(())
2811 })
2812 }
2813 }
2814
2815 impl BatchExecutor for ContractBatchExecutor {
2816 fn invoke(&self, _inputs: BatchInputs) -> Result<BatchOutputs> {
2817 let (output_port, source_node): (&str, &str) = match self.mode {
2818 ContractBatchOutputMode::MatchingSource => ("out", "source"),
2819 ContractBatchOutputMode::MismatchedSource => ("out", "other"),
2820 ContractBatchOutputMode::UnknownPort => ("rogue", "source"),
2821 };
2822 let mut outputs: BatchOutputs = BatchOutputs::new();
2823 outputs.push(
2824 port_id(output_port),
2825 packet_between(b"contracted", source_node, "sink"),
2826 );
2827 Ok(outputs)
2828 }
2829 }
2830
2831 #[derive(Debug)]
2832 struct RecordingSinkExecutor {
2833 received: Arc<Mutex<Vec<Vec<u8>>>>,
2834 }
2835
2836 impl RecordingSinkExecutor {
2837 fn new(received: Arc<Mutex<Vec<Vec<u8>>>>) -> Self {
2838 Self { received }
2839 }
2840 }
2841
2842 impl NodeExecutor for RecordingSinkExecutor {
2843 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2844
2845 fn run(
2846 &self,
2847 ctx: NodeContext,
2848 mut inputs: PortsIn,
2849 _outputs: PortsOut,
2850 ) -> Self::RunFuture<'_> {
2851 Box::pin(async move {
2852 match inputs.recv(&port_id("in"), &ctx.cancellation_token()).await {
2853 Ok(Some(packet)) => {
2854 self.received
2855 .lock()
2856 .expect("recording sink lock should not be poisoned")
2857 .push(packet_payload_bytes(packet));
2858 }
2859 Ok(None)
2860 | Err(PortRecvError::Disconnected { .. } | PortRecvError::Cancelled { .. }) => {
2861 }
2862 Err(err) => return Err(err.into()),
2863 }
2864
2865 Ok(())
2866 })
2867 }
2868 }
2869
2870 #[derive(Debug)]
2871 enum ContractBatchRegistryExecutor {
2872 Batch(BatchNodeExecutor<ContractBatchExecutor>),
2873 Sink(RecordingSinkExecutor),
2874 }
2875
2876 impl NodeExecutor for ContractBatchRegistryExecutor {
2877 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2878
2879 fn run(&self, ctx: NodeContext, inputs: PortsIn, outputs: PortsOut) -> Self::RunFuture<'_> {
2880 match self {
2881 Self::Batch(executor) => executor.run(ctx, inputs, outputs),
2882 Self::Sink(executor) => executor.run(ctx, inputs, outputs),
2883 }
2884 }
2885 }
2886
2887 #[derive(Debug, Default)]
2888 struct SiblingCancellationExecutor {
2889 cancellation_observed: Mutex<bool>,
2890 }
2891
2892 impl SiblingCancellationExecutor {
2893 fn cancellation_observed(&self) -> bool {
2894 *self
2895 .cancellation_observed
2896 .lock()
2897 .expect("sibling cancellation executor lock should not be poisoned")
2898 }
2899 }
2900
2901 #[derive(Debug, Default)]
2902 struct CapacityProbeExecutor {
2903 observed: Mutex<Vec<Option<usize>>>,
2904 }
2905
2906 impl CapacityProbeExecutor {
2907 fn observed_capacities(&self) -> Vec<Option<usize>> {
2908 self.observed
2909 .lock()
2910 .expect("capacity probe executor lock should not be poisoned")
2911 .clone()
2912 }
2913 }
2914
2915 #[derive(Debug, Default)]
2916 struct RecordingMetadataSink {
2917 records: Mutex<Vec<MetadataRecord>>,
2918 }
2919
2920 impl RecordingMetadataSink {
2921 fn records(&self) -> Vec<MetadataRecord> {
2922 self.records
2923 .lock()
2924 .expect("metadata sink lock should not be poisoned")
2925 .clone()
2926 }
2927 }
2928
2929 impl MetadataSink for RecordingMetadataSink {
2930 fn record(&self, record: &MetadataRecord) -> Result<()> {
2931 self.records
2932 .lock()
2933 .expect("metadata sink lock should not be poisoned")
2934 .push(record.clone());
2935 Ok(())
2936 }
2937 }
2938
2939 #[derive(Debug, Default)]
2940 struct RecordingLifecycleHook {
2941 events: Mutex<Vec<LifecycleEventKind>>,
2942 }
2943
2944 impl RecordingLifecycleHook {
2945 fn recorded(&self) -> Vec<LifecycleEventKind> {
2946 self.events
2947 .lock()
2948 .expect("lifecycle hook lock should not be poisoned")
2949 .clone()
2950 }
2951 }
2952
2953 impl LifecycleHook for RecordingLifecycleHook {
2954 fn observe(&self, event: &LifecycleEvent) -> Result<()> {
2955 self.events
2956 .lock()
2957 .expect("lifecycle hook lock should not be poisoned")
2958 .push(event.kind());
2959 Ok(())
2960 }
2961 }
2962
2963 impl NodeExecutor for CapacityProbeExecutor {
2964 type RunFuture<'a> = Ready<Result<()>>;
2965
2966 fn run(
2967 &self,
2968 ctx: NodeContext,
2969 inputs: PortsIn,
2970 _outputs: PortsOut,
2971 ) -> Self::RunFuture<'_> {
2972 if ctx.node_id().as_str() == "probe" {
2973 let capacity = inputs.capacity(&port_id("in"));
2974 self.observed
2975 .lock()
2976 .expect("capacity probe executor lock should not be poisoned")
2977 .push(capacity);
2978 }
2979
2980 ready(Ok(()))
2981 }
2982 }
2983
2984 impl NodeExecutor for SiblingCancellationExecutor {
2985 type RunFuture<'a> = BoxFuture<'a, Result<()>>;
2986
2987 fn run(
2988 &self,
2989 ctx: NodeContext,
2990 mut inputs: PortsIn,
2991 _outputs: PortsOut,
2992 ) -> Self::RunFuture<'_> {
2993 Box::pin(async move {
2994 if ctx.node_id().as_str() == "fail" {
2995 return Err(PureflowError::execution("fail requested"));
2996 }
2997
2998 if ctx.node_id().as_str() == "worker" {
2999 let cancellation = ctx.cancellation_token();
3000 let result: std::result::Result<Option<PortPacket>, PortRecvError> =
3001 inputs.recv(&port_id("in"), &cancellation).await;
3002 if matches!(result, Err(PortRecvError::Cancelled { .. })) {
3003 *self
3004 .cancellation_observed
3005 .lock()
3006 .expect("sibling cancellation executor lock should not be poisoned") =
3007 true;
3008 return Ok(());
3009 }
3010
3011 return Err(PureflowError::execution(
3012 "worker input should be cancelled after sibling failure",
3013 ));
3014 }
3015
3016 Ok(())
3017 })
3018 }
3019 }
3020
3021 fn execution_id(value: &str) -> ExecutionId {
3022 ExecutionId::new(value).expect("valid execution id")
3023 }
3024
3025 fn message_id(value: &str) -> MessageId {
3026 MessageId::new(value).expect("valid message id")
3027 }
3028
3029 fn packet(value: &[u8]) -> PortPacket {
3030 packet_between(value, "source", "sink")
3031 }
3032
3033 fn packet_between(value: &[u8], source_node: &str, target_node: &str) -> PortPacket {
3034 let source: MessageEndpoint = MessageEndpoint::new(node_id(source_node), port_id("out"));
3035 let target: MessageEndpoint = MessageEndpoint::new(node_id(target_node), port_id("in"));
3036 let route: MessageRoute = MessageRoute::new(Some(source), target);
3037 let execution: ExecutionMetadata = ExecutionMetadata::first_attempt(execution_id("run-1"));
3038 let metadata: MessageMetadata =
3039 MessageMetadata::new(message_id("msg-1"), workflow_id("flow"), execution, route);
3040
3041 PortPacket::new(metadata, PacketPayload::from(value.to_vec()))
3042 }
3043
3044 fn schema(value: &str) -> SchemaRef {
3045 SchemaRef::new(value).expect("valid schema ref")
3046 }
3047
3048 fn source_output_contracts() -> Vec<NodeContract> {
3049 vec![
3050 NodeContract::new(
3051 node_id("source"),
3052 vec![PortContract::new(
3053 port_id("out"),
3054 PortDirection::Output,
3055 Some(schema("schema://packet")),
3056 )],
3057 ExecutionMode::Native,
3058 Determinism::Unknown,
3059 RetryDisposition::Unknown,
3060 )
3061 .expect("valid source contract"),
3062 ]
3063 }
3064
3065 fn cyclic_workflow() -> WorkflowDefinition {
3066 let first: NodeDefinition = NodeBuilder::new("first").input("in").output("out").build();
3067 let second: NodeDefinition = NodeBuilder::new("second").input("in").output("out").build();
3068 let graph = pureflow_workflow::WorkflowGraph::with_cycles_allowed(
3069 [first, second],
3070 [
3071 EdgeDefinition::new(
3072 pureflow_workflow::EdgeEndpoint::new(node_id("first"), port_id("out")),
3073 pureflow_workflow::EdgeEndpoint::new(node_id("second"), port_id("in")),
3074 ),
3075 EdgeDefinition::new(
3076 pureflow_workflow::EdgeEndpoint::new(node_id("second"), port_id("out")),
3077 pureflow_workflow::EdgeEndpoint::new(node_id("first"), port_id("in")),
3078 ),
3079 ],
3080 )
3081 .expect("cycle-allowed workflow graph should build");
3082
3083 WorkflowDefinition::new(workflow_id("flow"), graph)
3084 }
3085
3086 fn packet_payload_bytes(packet: PortPacket) -> Vec<u8> {
3087 packet
3088 .into_payload()
3089 .as_bytes()
3090 .expect("engine backpressure tests send bytes")
3091 .to_vec()
3092 }
3093
3094 #[test]
3095 fn workflow_run_policy_names_feedback_loop_startup_and_termination() {
3096 let feedback_loop: FeedbackLoopRunPolicy =
3097 FeedbackLoopRunPolicy::start_all_nodes_until_complete();
3098 let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(feedback_loop);
3099
3100 assert_eq!(feedback_loop.startup(), FeedbackLoopStartup::StartAllNodes);
3101 assert_eq!(
3102 feedback_loop.termination(),
3103 FeedbackLoopTermination::AllNodesComplete
3104 );
3105 assert_eq!(
3106 policy.cycle_policy(),
3107 CycleRunPolicy::AllowFeedbackLoops(feedback_loop)
3108 );
3109 assert_eq!(
3110 WorkflowRunPolicy::default().cycle_policy(),
3111 CycleRunPolicy::Reject
3112 );
3113 assert_eq!(
3114 WorkflowRunPolicy::default().watchdog_policy(),
3115 WorkflowWatchdogPolicy::Disabled
3116 );
3117 }
3118
3119 #[test]
3120 fn run_workflow_passes_execution_metadata_to_each_node() {
3121 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3122 .node(NodeBuilder::new("first").build())
3123 .node(NodeBuilder::new("second").build())
3124 .build();
3125 let execution: ExecutionMetadata = execution_metadata("run-1");
3126 let executor: RecordingExecutor = RecordingExecutor::default();
3127
3128 block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
3129
3130 let contexts: Vec<NodeContext> = executor.visited_contexts();
3131 assert_eq!(contexts[0].workflow_id().as_str(), "flow");
3132 assert_eq!(contexts[0].execution().execution_id().as_str(), "run-1");
3133 assert_eq!(executor.visited_node_names(), vec!["first", "second"]);
3134 }
3135
3136 #[test]
3137 fn run_workflow_with_metadata_sink_records_lifecycle_events() {
3138 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3139 .node(NodeBuilder::new("source").output("out").build())
3140 .node(NodeBuilder::new("sink").input("in").build())
3141 .edge("source", "out", "sink", "in")
3142 .build();
3143 let execution: ExecutionMetadata = execution_metadata("run-1");
3144 let executor: RecordingExecutor = RecordingExecutor::default();
3145 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
3146
3147 block_on(run_workflow_with_metadata_sink(
3148 &workflow,
3149 &execution,
3150 &executor,
3151 sink.clone(),
3152 ))
3153 .expect("metadata workflow run should succeed");
3154 let lifecycle_count: usize = sink
3155 .records()
3156 .iter()
3157 .filter(|record: &&MetadataRecord| matches!(record, MetadataRecord::Lifecycle(_)))
3158 .count();
3159
3160 assert_eq!(lifecycle_count, 4);
3161 }
3162
3163 #[test]
3164 fn run_workflow_with_observers_summary_records_lifecycle_hook_and_metadata() {
3165 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3166 .node(NodeBuilder::new("node").build())
3167 .build();
3168 let execution: ExecutionMetadata = execution_metadata("run-1");
3169 let executor: RecordingExecutor = RecordingExecutor::default();
3170 let hook: RecordingLifecycleHook = RecordingLifecycleHook::default();
3171 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
3172
3173 let summary: WorkflowRunSummary = block_on(run_workflow_with_observers_summary(
3174 &workflow,
3175 &execution,
3176 &executor,
3177 &hook,
3178 sink.clone(),
3179 ))
3180 .expect("observer workflow run should return a summary");
3181
3182 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3183 assert_eq!(summary.completed_node_count(), 1);
3184 assert_eq!(
3185 hook.recorded(),
3186 vec![
3187 LifecycleEventKind::NodeStarted,
3188 LifecycleEventKind::NodeCompleted
3189 ]
3190 );
3191
3192 let lifecycle_records: Vec<MetadataRecord> = sink
3193 .records()
3194 .into_iter()
3195 .filter(|record: &MetadataRecord| matches!(record, MetadataRecord::Lifecycle(_)))
3196 .collect();
3197 assert_eq!(lifecycle_records.len(), 2);
3198 }
3199
3200 #[test]
3201 fn run_workflow_with_registry_resolves_executor_per_node() {
3202 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3203 .node(NodeBuilder::new("source").output("out").build())
3204 .node(NodeBuilder::new("sink").input("in").build())
3205 .edge("source", "out", "sink", "in")
3206 .build();
3207 let execution: ExecutionMetadata = execution_metadata("run-1");
3208 let received: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
3209 let registry: StaticNodeExecutorRegistry<RegistryExecutor> =
3210 StaticNodeExecutorRegistry::new(BTreeMap::from([
3211 (
3212 node_id("source"),
3213 RegistryExecutor::source(Arc::clone(&received)),
3214 ),
3215 (
3216 node_id("sink"),
3217 RegistryExecutor::sink(Arc::clone(&received)),
3218 ),
3219 ]));
3220
3221 block_on(run_workflow_with_registry(&workflow, &execution, ®istry))
3222 .expect("registry workflow should run");
3223
3224 assert_eq!(
3225 *received
3226 .lock()
3227 .expect("registry test lock should not be poisoned"),
3228 vec![b"registered".to_vec()]
3229 );
3230 }
3231
3232 #[test]
3233 fn run_workflow_with_registry_rejects_missing_executor() {
3234 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3235 .node(NodeBuilder::new("missing").build())
3236 .build();
3237 let execution: ExecutionMetadata = execution_metadata("run-1");
3238 let registry: StaticNodeExecutorRegistry<RegistryExecutor> =
3239 StaticNodeExecutorRegistry::new(BTreeMap::new());
3240
3241 let err: PureflowError =
3242 block_on(run_workflow_with_registry(&workflow, &execution, ®istry))
3243 .expect_err("missing registry entry should fail");
3244
3245 assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
3246 assert!(
3247 err.to_string()
3248 .contains("no executor registered for workflow node `missing`")
3249 );
3250 }
3251
3252 #[test]
3253 fn run_workflow_summary_reports_successful_terminal_state() {
3254 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3255 .node(NodeBuilder::new("first").build())
3256 .node(NodeBuilder::new("second").build())
3257 .build();
3258 let execution: ExecutionMetadata = execution_metadata("run-1");
3259 let executor: RecordingExecutor = RecordingExecutor::default();
3260
3261 let summary: WorkflowRunSummary =
3262 block_on(run_workflow_summary(&workflow, &execution, &executor))
3263 .expect("summary workflow should run");
3264
3265 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3266 assert_eq!(summary.scheduled_node_count(), 2);
3267 assert_eq!(summary.completed_node_count(), 2);
3268 assert_eq!(summary.failed_node_count(), 0);
3269 assert_eq!(summary.cancelled_node_count(), 0);
3270 assert_eq!(summary.pending_node_count(), 0);
3271 assert_eq!(summary.error_count(), 0);
3272 assert_eq!(summary.observed_message_count(), 0);
3273 assert!(summary.first_error().is_none());
3274 }
3275
3276 #[test]
3277 fn run_workflow_summary_rejects_cycle_allowed_graph_by_default() {
3278 let workflow: WorkflowDefinition = cyclic_workflow();
3279 let execution: ExecutionMetadata = execution_metadata("run-1");
3280 let executor: RecordingExecutor = RecordingExecutor::default();
3281
3282 let err: PureflowError = block_on(run_workflow_summary(&workflow, &execution, &executor))
3283 .expect_err("default run policy should reject cyclic workflow");
3284
3285 assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
3286 assert!(err.to_string().contains("first -> second -> first"));
3287 assert!(
3288 err.to_string()
3289 .contains("explicit feedback-loop run policy")
3290 );
3291 assert!(executor.visited_contexts().is_empty());
3292 }
3293
3294 #[test]
3295 fn run_workflow_with_feedback_loop_policy_allows_cycle_allowed_graph() {
3296 let workflow: WorkflowDefinition = cyclic_workflow();
3297 let execution: ExecutionMetadata = execution_metadata("run-1");
3298 let executor: RecordingExecutor = RecordingExecutor::default();
3299 let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(
3300 FeedbackLoopRunPolicy::start_all_nodes_until_complete(),
3301 );
3302
3303 let summary: WorkflowRunSummary = block_on(run_workflow_with_policy_summary(
3304 &workflow, &execution, &executor, policy,
3305 ))
3306 .expect("feedback-loop policy should allow cyclic workflow");
3307
3308 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3309 assert_eq!(summary.scheduled_node_count(), 2);
3310 assert_eq!(summary.completed_node_count(), 2);
3311 assert_eq!(
3312 executor.visited_node_names(),
3313 vec![String::from("first"), String::from("second")]
3314 );
3315 }
3316
3317 #[test]
3318 fn feedback_loop_policy_runs_deterministic_cycle_messages() {
3319 let workflow: WorkflowDefinition = cyclic_workflow();
3320 let execution: ExecutionMetadata = execution_metadata("run-1");
3321 let observed: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
3322 let registry: StaticNodeExecutorRegistry<FeedbackLoopExecutor> =
3323 StaticNodeExecutorRegistry::new(BTreeMap::from([
3324 (
3325 node_id("first"),
3326 FeedbackLoopExecutor::driver(Arc::clone(&observed)),
3327 ),
3328 (
3329 node_id("second"),
3330 FeedbackLoopExecutor::counter(Arc::clone(&observed)),
3331 ),
3332 ]));
3333 let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(
3334 FeedbackLoopRunPolicy::start_all_nodes_until_complete(),
3335 );
3336
3337 let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_policy_summary(
3338 &workflow, &execution, ®istry, policy,
3339 ))
3340 .expect("explicit feedback-loop policy should run cyclic messages");
3341
3342 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3343 assert_eq!(summary.completed_node_count(), 2);
3344 assert_eq!(summary.pending_node_count(), 0);
3345 assert_eq!(
3346 *observed
3347 .lock()
3348 .expect("feedback loop observed lock should not be poisoned"),
3349 vec![String::from("counter:seed"), String::from("driver:ack")]
3350 );
3351 }
3352
3353 #[test]
3354 fn feedback_loop_policy_cancels_siblings_for_shutdown() {
3355 let workflow: WorkflowDefinition = cyclic_workflow();
3356 let execution: ExecutionMetadata = execution_metadata("run-1");
3357 let cancellation_observed: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3358 let registry: StaticNodeExecutorRegistry<FeedbackLoopShutdownExecutor> =
3359 StaticNodeExecutorRegistry::new(BTreeMap::from([
3360 (
3361 node_id("first"),
3362 FeedbackLoopShutdownExecutor::failing(Arc::clone(&cancellation_observed)),
3363 ),
3364 (
3365 node_id("second"),
3366 FeedbackLoopShutdownExecutor::shutdown_watcher(Arc::clone(
3367 &cancellation_observed,
3368 )),
3369 ),
3370 ]));
3371 let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(
3372 FeedbackLoopRunPolicy::start_all_nodes_until_complete(),
3373 );
3374
3375 let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_policy_summary(
3376 &workflow, &execution, ®istry, policy,
3377 ))
3378 .expect("feedback-loop shutdown should return summary data");
3379
3380 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3381 assert_eq!(summary.completed_node_count(), 1);
3382 assert_eq!(summary.failed_node_count(), 1);
3383 assert_eq!(summary.pending_node_count(), 0);
3384 assert!(
3385 summary
3386 .first_error()
3387 .expect("shutdown failure should be retained")
3388 .to_string()
3389 .contains("feedback loop shutdown requested")
3390 );
3391 assert!(
3392 *cancellation_observed
3393 .lock()
3394 .expect("shutdown observed lock should not be poisoned")
3395 );
3396 }
3397
3398 #[test]
3399 fn workflow_deadlock_watchdog_reports_stalled_cycle_state() {
3400 let workflow: WorkflowDefinition = cyclic_workflow();
3401 let execution: ExecutionMetadata = execution_metadata("run-1");
3402 let executor: WaitingInputExecutor = WaitingInputExecutor::default();
3403 let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(
3404 FeedbackLoopRunPolicy::start_all_nodes_until_complete(),
3405 )
3406 .with_watchdog(WorkflowWatchdogPolicy::deadlock_after(
3407 Duration::from_millis(1),
3408 ));
3409
3410 let summary: WorkflowRunSummary = block_on(run_workflow_with_policy_summary(
3411 &workflow, &execution, &executor, policy,
3412 ))
3413 .expect("deadlock watchdog should report a stalled cyclic workflow as summary data");
3414 let diagnostic: &WorkflowDeadlockDiagnostic = summary
3415 .deadlock_diagnostic()
3416 .expect("deadlock diagnostic should be captured");
3417 let err_text: String = summary
3418 .first_error()
3419 .expect("deadlock should be recorded as first error")
3420 .to_string();
3421
3422 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3423 assert_eq!(summary.error_count(), 1);
3424 assert_eq!(summary.pending_node_count(), 2);
3425 assert!(err_text.contains("watchdog detected no workflow progress"));
3426 assert_eq!(diagnostic.pending_node_count(), 2);
3427 assert_eq!(diagnostic.bounded_edge_count(), 2);
3428 assert_eq!(
3429 executor.visited_node_names(),
3430 vec![String::from("first"), String::from("second")]
3431 );
3432 }
3433
3434 #[test]
3435 fn run_workflow_summary_retains_first_failure_without_returning_error() {
3436 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3437 .node(NodeBuilder::new("first").build())
3438 .node(NodeBuilder::new("second").build())
3439 .build();
3440 let execution: ExecutionMetadata = execution_metadata("run-1");
3441 let executor: AggregateFailureExecutor = AggregateFailureExecutor::default();
3442
3443 let summary: WorkflowRunSummary =
3444 block_on(run_workflow_summary(&workflow, &execution, &executor))
3445 .expect("summary should preserve node failures as data");
3446
3447 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3448 assert_eq!(summary.scheduled_node_count(), 2);
3449 assert_eq!(summary.completed_node_count(), 1);
3450 assert_eq!(summary.failed_node_count(), 1);
3451 assert_eq!(summary.cancelled_node_count(), 0);
3452 assert_eq!(summary.error_count(), 1);
3453 assert!(
3454 summary
3455 .first_error()
3456 .expect("summary should retain first error")
3457 .to_string()
3458 .contains("first failed")
3459 );
3460 }
3461
3462 #[test]
3463 fn run_workflow_with_metadata_sink_records_workflow_error_metadata() {
3464 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3465 .node(NodeBuilder::new("first").build())
3466 .node(NodeBuilder::new("second").build())
3467 .build();
3468 let execution: ExecutionMetadata = execution_metadata("run-1");
3469 let executor: AggregateFailureExecutor = AggregateFailureExecutor::default();
3470 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
3471
3472 let summary: WorkflowRunSummary = block_on(run_workflow_with_metadata_sink_summary(
3473 &workflow,
3474 &execution,
3475 &executor,
3476 sink.clone(),
3477 ))
3478 .expect("summary should preserve node failures as data");
3479 let records: Vec<MetadataRecord> = sink.records();
3480 let workflow_error = records
3481 .iter()
3482 .find_map(|record: &MetadataRecord| match record {
3483 MetadataRecord::Error(error)
3484 if error.kind() == ErrorMetadataKind::WorkflowFailed =>
3485 {
3486 Some(error)
3487 }
3488 _ => None,
3489 })
3490 .expect("workflow error metadata should be recorded");
3491 let node_error_count: usize = records
3492 .iter()
3493 .filter(|record: &&MetadataRecord| {
3494 matches!(
3495 record,
3496 MetadataRecord::Error(error)
3497 if error.kind() == ErrorMetadataKind::NodeFailed
3498 )
3499 })
3500 .count();
3501
3502 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3503 assert_eq!(workflow_error.workflow_id().as_str(), "flow");
3504 assert!(workflow_error.node_id().is_none());
3505 assert!(workflow_error.error().to_string().contains("first failed"));
3506 assert!(workflow_error.diagnostic().is_none());
3507 assert_eq!(node_error_count, 1);
3508 }
3509
3510 #[test]
3511 fn watchdog_metadata_records_deadlock_diagnostic_payload() {
3512 let workflow: WorkflowDefinition = cyclic_workflow();
3513 let execution: ExecutionMetadata = execution_metadata("run-1");
3514 let executor: WaitingInputExecutor = WaitingInputExecutor::default();
3515 let registry: SingleNodeExecutorRegistry<'_, WaitingInputExecutor> =
3516 SingleNodeExecutorRegistry::new(&executor);
3517 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
3518 let policy: WorkflowRunPolicy = WorkflowRunPolicy::feedback_loops(
3519 FeedbackLoopRunPolicy::start_all_nodes_until_complete(),
3520 )
3521 .with_watchdog(WorkflowWatchdogPolicy::deadlock_after(
3522 Duration::from_millis(1),
3523 ));
3524
3525 let summary: WorkflowRunSummary =
3526 block_on(run_workflow_with_registry_policy_and_metadata_sink_summary(
3527 &workflow,
3528 &execution,
3529 ®istry,
3530 policy,
3531 sink.clone(),
3532 ))
3533 .expect("watchdog run should return summary data");
3534 let records: Vec<MetadataRecord> = sink.records();
3535 let workflow_error = records
3536 .iter()
3537 .find_map(|record: &MetadataRecord| match record {
3538 MetadataRecord::Error(error)
3539 if error.kind() == ErrorMetadataKind::WorkflowFailed =>
3540 {
3541 Some(error)
3542 }
3543 _ => None,
3544 })
3545 .expect("workflow error metadata should be recorded");
3546
3547 assert!(summary.deadlock_diagnostic().is_some());
3548 match workflow_error.diagnostic() {
3549 Some(ErrorDiagnosticMetadata::WorkflowDeadlock(deadlock)) => {
3550 assert_eq!(deadlock.pending_node_count(), 2);
3551 assert_eq!(deadlock.bounded_edge_count(), 2);
3552 assert_eq!(deadlock.no_progress_timeout_ms(), 1);
3553 assert_eq!(deadlock.cycle_policy(), "allow_feedback_loops");
3554 assert_eq!(deadlock.feedback_loop_startup(), Some("start_all_nodes"));
3555 assert_eq!(
3556 deadlock.feedback_loop_termination(),
3557 Some("all_nodes_complete")
3558 );
3559 }
3560 _ => panic!("workflow error should include deadlock diagnostic metadata"),
3561 }
3562 }
3563
3564 #[test]
3565 fn supervisor_summary_covers_failing_source_and_downstream_cancellation() {
3566 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3567 .node(NodeBuilder::new("source").output("out").build())
3568 .node(
3569 NodeBuilder::new("transform")
3570 .input("in")
3571 .output("out")
3572 .build(),
3573 )
3574 .node(NodeBuilder::new("sink").input("in").build())
3575 .edge("source", "out", "transform", "in")
3576 .edge("transform", "out", "sink", "in")
3577 .build();
3578 let execution: ExecutionMetadata = execution_metadata("run-1");
3579 let registry: StaticNodeExecutorRegistry<FailureMatrixExecutor> =
3580 StaticNodeExecutorRegistry::new(BTreeMap::from([
3581 (node_id("source"), FailureMatrixExecutor::source_fails()),
3582 (
3583 node_id("transform"),
3584 FailureMatrixExecutor::passthrough_transform(),
3585 ),
3586 (node_id("sink"), FailureMatrixExecutor::sink_waits()),
3587 ]));
3588
3589 let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_summary(
3590 &workflow, &execution, ®istry,
3591 ))
3592 .expect("summary should preserve source failure as data");
3593
3594 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3595 assert_eq!(summary.scheduled_node_count(), 3);
3596 assert_eq!(summary.completed_node_count(), 0);
3597 assert_eq!(summary.failed_node_count(), 1);
3598 assert_eq!(summary.cancelled_node_count(), 2);
3599 assert_eq!(summary.pending_node_count(), 0);
3600 assert_eq!(summary.error_count(), 3);
3601 assert!(
3602 summary
3603 .first_error()
3604 .expect("summary should retain source failure")
3605 .to_string()
3606 .contains("matrix source failed")
3607 );
3608 }
3609
3610 #[test]
3611 fn supervisor_summary_covers_failing_transform_and_error_metadata() {
3612 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3613 .node(NodeBuilder::new("source").output("out").build())
3614 .node(
3615 NodeBuilder::new("transform")
3616 .input("in")
3617 .output("out")
3618 .build(),
3619 )
3620 .node(NodeBuilder::new("sink").input("in").build())
3621 .edge("source", "out", "transform", "in")
3622 .edge("transform", "out", "sink", "in")
3623 .build();
3624 let execution: ExecutionMetadata = execution_metadata("run-1");
3625 let registry: StaticNodeExecutorRegistry<FailureMatrixExecutor> =
3626 StaticNodeExecutorRegistry::new(BTreeMap::from([
3627 (
3628 node_id("source"),
3629 FailureMatrixExecutor::source_for_transform_failure(),
3630 ),
3631 (
3632 node_id("transform"),
3633 FailureMatrixExecutor::transform_fails(),
3634 ),
3635 (node_id("sink"), FailureMatrixExecutor::sink_waits()),
3636 ]));
3637 let sink: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
3638
3639 let summary: WorkflowRunSummary =
3640 block_on(run_workflow_with_registry_and_metadata_sink_summary(
3641 &workflow,
3642 &execution,
3643 ®istry,
3644 sink.clone(),
3645 ))
3646 .expect("summary should preserve transform failure as data");
3647 let records = sink.records();
3648 let node_error_count = records
3649 .iter()
3650 .filter(|record| {
3651 matches!(
3652 record,
3653 MetadataRecord::Error(error)
3654 if error.kind() == ErrorMetadataKind::NodeFailed
3655 )
3656 })
3657 .count();
3658 let workflow_error = records
3659 .iter()
3660 .find_map(|record| match record {
3661 MetadataRecord::Error(error)
3662 if error.kind() == ErrorMetadataKind::WorkflowFailed =>
3663 {
3664 Some(error)
3665 }
3666 _ => None,
3667 })
3668 .expect("workflow error metadata should be recorded");
3669
3670 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3671 assert_eq!(summary.completed_node_count(), 1);
3672 assert_eq!(summary.failed_node_count(), 1);
3673 assert_eq!(summary.cancelled_node_count(), 1);
3674 assert_eq!(summary.pending_node_count(), 0);
3675 assert_eq!(summary.error_count(), 2);
3676 assert!(
3677 workflow_error
3678 .error()
3679 .to_string()
3680 .contains("matrix transform failed")
3681 );
3682 assert!(workflow_error.node_id().is_none());
3683 assert_eq!(node_error_count, 2);
3684 }
3685
3686 #[test]
3687 fn supervisor_summary_covers_disconnected_downstream_send_failure() {
3688 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3689 .node(NodeBuilder::new("source").output("out").build())
3690 .node(NodeBuilder::new("sink").input("in").build())
3691 .edge("source", "out", "sink", "in")
3692 .build();
3693 let execution: ExecutionMetadata = execution_metadata("run-1");
3694 let (source, sink) = DisconnectedDownstreamExecutor::pair();
3695 let registry: StaticNodeExecutorRegistry<DisconnectedDownstreamExecutor> =
3696 StaticNodeExecutorRegistry::new(BTreeMap::from([
3697 (node_id("source"), source),
3698 (node_id("sink"), sink),
3699 ]));
3700
3701 let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_summary(
3702 &workflow, &execution, ®istry,
3703 ))
3704 .expect("summary should preserve disconnected send failure as data");
3705
3706 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3707 assert_eq!(summary.completed_node_count(), 1);
3708 assert_eq!(summary.failed_node_count(), 1);
3709 assert_eq!(summary.cancelled_node_count(), 0);
3710 assert_eq!(summary.pending_node_count(), 0);
3711 assert_eq!(summary.error_count(), 1);
3712 assert!(
3713 summary
3714 .first_error()
3715 .expect("summary should retain disconnected send error")
3716 .to_string()
3717 .contains("disconnected")
3718 );
3719 }
3720
3721 #[test]
3722 fn supervisor_rejects_fan_out_partial_send_after_one_downstream_disconnects() {
3723 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3724 .node(NodeBuilder::new("source").output("out").build())
3725 .node(NodeBuilder::new("good").input("in").build())
3726 .node(NodeBuilder::new("drop").input("in").build())
3727 .edge("source", "out", "good", "in")
3728 .edge("source", "out", "drop", "in")
3729 .build();
3730 let execution: ExecutionMetadata = execution_metadata("run-1");
3731 let registry = FanOutPartialFailureExecutor::registry();
3732
3733 let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_summary(
3734 &workflow, &execution, ®istry,
3735 ))
3736 .expect("summary should preserve fan-out send failure as data");
3737 let good_sink_observation = registry
3738 .executors()
3739 .get(&node_id("good"))
3740 .expect("good sink executor should be registered")
3741 .good_sink_observation()
3742 .expect("good sink should record cancellation or closure");
3743
3744 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3745 assert_eq!(summary.completed_node_count(), 2);
3746 assert_eq!(summary.failed_node_count(), 1);
3747 assert_eq!(summary.cancelled_node_count(), 0);
3748 assert_eq!(summary.pending_node_count(), 0);
3749 assert_eq!(summary.error_count(), 1);
3750 assert!(
3751 summary
3752 .first_error()
3753 .expect("summary should retain fan-out send error")
3754 .to_string()
3755 .contains("disconnected")
3756 );
3757 assert!(
3758 good_sink_observation == "cancelled_without_packet"
3759 || good_sink_observation == "disconnected_without_packet"
3760 || good_sink_observation == "closed_without_packet",
3761 "good sink must not receive a partial fan-out packet: {good_sink_observation}"
3762 );
3763 }
3764
3765 #[test]
3766 fn run_workflow_summary_reports_cancellation_terminal_state() {
3767 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3768 .node(NodeBuilder::new("node").build())
3769 .build();
3770 let execution: ExecutionMetadata = execution_metadata("run-1");
3771 let executor: CancelledExecutor = CancelledExecutor;
3772
3773 let summary: WorkflowRunSummary =
3774 block_on(run_workflow_summary(&workflow, &execution, &executor))
3775 .expect("summary should preserve cancellation as data");
3776
3777 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Cancelled);
3778 assert_eq!(summary.scheduled_node_count(), 1);
3779 assert_eq!(summary.completed_node_count(), 0);
3780 assert_eq!(summary.failed_node_count(), 0);
3781 assert_eq!(summary.cancelled_node_count(), 1);
3782 assert_eq!(summary.error_count(), 1);
3783 }
3784
3785 #[test]
3786 fn run_workflow_with_contracts_summary_accepts_matching_output_source() {
3787 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3788 .node(NodeBuilder::new("source").output("out").build())
3789 .build();
3790 let execution: ExecutionMetadata = execution_metadata("run-1");
3791 let executor: ContractOutputExecutor = ContractOutputExecutor::matching_source();
3792
3793 let summary: WorkflowRunSummary = block_on(run_workflow_with_contracts_summary(
3794 &workflow,
3795 &execution,
3796 &executor,
3797 &source_output_contracts(),
3798 ))
3799 .expect("contract-aware workflow should run");
3800
3801 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3802 assert_eq!(summary.completed_node_count(), 1);
3803 assert!(summary.first_error().is_none());
3804 }
3805
3806 #[test]
3807 fn run_workflow_with_contracts_summary_rejects_mismatched_output_source() {
3808 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3809 .node(NodeBuilder::new("source").output("out").build())
3810 .build();
3811 let execution: ExecutionMetadata = execution_metadata("run-1");
3812 let executor: ContractOutputExecutor = ContractOutputExecutor::mismatched_source();
3813
3814 let summary: WorkflowRunSummary = block_on(run_workflow_with_contracts_summary(
3815 &workflow,
3816 &execution,
3817 &executor,
3818 &source_output_contracts(),
3819 ))
3820 .expect("contract-aware summary should preserve output validation failures as data");
3821
3822 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3823 assert_eq!(summary.completed_node_count(), 0);
3824 assert_eq!(summary.failed_node_count(), 1);
3825 assert!(
3826 summary
3827 .first_error()
3828 .expect("summary should retain output validation error")
3829 .to_string()
3830 .contains("does not match output")
3831 );
3832 }
3833
3834 #[test]
3835 fn batch_node_executor_sends_matching_outputs_through_graph_edges() {
3836 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3837 .node(NodeBuilder::new("source").output("out").build())
3838 .node(NodeBuilder::new("sink").input("in").build())
3839 .edge("source", "out", "sink", "in")
3840 .build();
3841 let execution: ExecutionMetadata = execution_metadata("run-1");
3842 let received: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
3843 let registry: StaticNodeExecutorRegistry<ContractBatchRegistryExecutor> =
3844 StaticNodeExecutorRegistry::new(BTreeMap::from([
3845 (
3846 node_id("source"),
3847 ContractBatchRegistryExecutor::Batch(BatchNodeExecutor::new(
3848 ContractBatchExecutor::matching_source(),
3849 )),
3850 ),
3851 (
3852 node_id("sink"),
3853 ContractBatchRegistryExecutor::Sink(RecordingSinkExecutor::new(Arc::clone(
3854 &received,
3855 ))),
3856 ),
3857 ]));
3858
3859 let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_contracts_summary(
3860 &workflow,
3861 &execution,
3862 ®istry,
3863 &source_output_contracts(),
3864 ))
3865 .expect("batch workflow should run through output contracts");
3866
3867 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Completed);
3868 assert_eq!(
3869 *received
3870 .lock()
3871 .expect("batch sink received lock should not be poisoned"),
3872 vec![b"contracted".to_vec()]
3873 );
3874 }
3875
3876 #[test]
3877 fn batch_node_executor_rejects_mismatched_output_before_sink_observes_it() {
3878 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3879 .node(NodeBuilder::new("source").output("out").build())
3880 .node(NodeBuilder::new("sink").input("in").build())
3881 .edge("source", "out", "sink", "in")
3882 .build();
3883 let execution: ExecutionMetadata = execution_metadata("run-1");
3884 let received: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
3885 let registry: StaticNodeExecutorRegistry<ContractBatchRegistryExecutor> =
3886 StaticNodeExecutorRegistry::new(BTreeMap::from([
3887 (
3888 node_id("source"),
3889 ContractBatchRegistryExecutor::Batch(BatchNodeExecutor::new(
3890 ContractBatchExecutor::mismatched_source(),
3891 )),
3892 ),
3893 (
3894 node_id("sink"),
3895 ContractBatchRegistryExecutor::Sink(RecordingSinkExecutor::new(Arc::clone(
3896 &received,
3897 ))),
3898 ),
3899 ]));
3900
3901 let summary: WorkflowRunSummary = block_on(run_workflow_with_registry_contracts_summary(
3902 &workflow,
3903 &execution,
3904 ®istry,
3905 &source_output_contracts(),
3906 ))
3907 .expect("batch output validation failures should be preserved as summary data");
3908
3909 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3910 assert!(
3911 summary
3912 .first_error()
3913 .expect("summary should retain output validation error")
3914 .to_string()
3915 .contains("does not match output")
3916 );
3917 assert!(
3918 received
3919 .lock()
3920 .expect("batch sink received lock should not be poisoned")
3921 .is_empty()
3922 );
3923 }
3924
3925 #[test]
3926 fn batch_node_executor_rejects_undeclared_output_ports() {
3927 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3928 .node(NodeBuilder::new("source").output("out").build())
3929 .build();
3930 let execution: ExecutionMetadata = execution_metadata("run-1");
3931 let executor: BatchNodeExecutor<ContractBatchExecutor> =
3932 BatchNodeExecutor::new(ContractBatchExecutor::unknown_port());
3933
3934 let summary: WorkflowRunSummary =
3935 block_on(run_workflow_summary(&workflow, &execution, &executor))
3936 .expect("batch output validation failures should be summary data");
3937
3938 assert_eq!(summary.terminal_state(), WorkflowTerminalState::Failed);
3939 assert!(
3940 summary
3941 .first_error()
3942 .expect("summary should retain unknown output error")
3943 .to_string()
3944 .contains("output port `rogue` is not declared")
3945 );
3946 }
3947
3948 #[test]
3949 fn run_workflow_propagates_executor_failures() {
3950 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3951 .node(NodeBuilder::new("first").build())
3952 .build();
3953 let execution: ExecutionMetadata = execution_metadata("run-1");
3954 let executor: FailingExecutor = FailingExecutor::execution("boom");
3955
3956 let err = block_on(run_workflow(&workflow, &execution, &executor))
3957 .expect_err("workflow should surface executor failures");
3958
3959 assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
3960 }
3961
3962 #[test]
3963 fn run_workflow_passes_declared_node_ports_to_executor() {
3964 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3965 .node(NodeBuilder::new("source").output("out").build())
3966 .node(NodeBuilder::new("sink").input("in").build())
3967 .build();
3968 let execution: ExecutionMetadata = execution_metadata("run-1");
3969 let executor: RecordingExecutor = RecordingExecutor::default();
3970
3971 block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
3972
3973 assert_eq!(
3974 executor.visited_input_port_names(),
3975 vec![Vec::<String>::new(), vec![String::from("in")]]
3976 );
3977 assert_eq!(
3978 executor.visited_output_port_names(),
3979 vec![vec![String::from("out")], Vec::<String>::new()]
3980 );
3981 }
3982
3983 #[test]
3984 fn run_workflow_wires_edges_as_bounded_port_channels() {
3985 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
3986 .node(NodeBuilder::new("source").output("out").build())
3987 .node(NodeBuilder::new("sink").input("in").build())
3988 .edge("source", "out", "sink", "in")
3989 .build();
3990 let execution: ExecutionMetadata = execution_metadata("run-1");
3991 let executor: ChannelExecutor = ChannelExecutor::default();
3992
3993 block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
3994
3995 assert_eq!(
3996 executor.received_payloads(),
3997 vec![b"hello".to_vec(), b"world".to_vec()]
3998 );
3999 }
4000
4001 #[test]
4002 fn run_workflow_uses_explicit_edge_capacity() {
4003 let workflow: WorkflowDefinition = WorkflowDefinition::from_parts(
4004 workflow_id("flow"),
4005 [
4006 NodeBuilder::new("source").output("out").build(),
4007 NodeBuilder::new("probe").input("in").build(),
4008 ],
4009 [EdgeDefinition::with_capacity(
4010 pureflow_workflow::EdgeEndpoint::new(node_id("source"), port_id("out")),
4011 pureflow_workflow::EdgeEndpoint::new(node_id("probe"), port_id("in")),
4012 NonZeroUsize::new(3).expect("nonzero"),
4013 )],
4014 )
4015 .expect("workflow should be valid");
4016 let execution: ExecutionMetadata = execution_metadata("run-1");
4017 let executor: CapacityProbeExecutor = CapacityProbeExecutor::default();
4018
4019 block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
4020
4021 assert_eq!(
4022 executor.observed_capacities(),
4023 vec![Some(NonZeroUsize::new(3).expect("nonzero").get())]
4024 );
4025 }
4026
4027 #[test]
4028 fn run_workflow_backpressure_blocks_until_downstream_receives() {
4029 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
4030 .node(NodeBuilder::new("source").output("out").build())
4031 .node(NodeBuilder::new("sink").input("in").build())
4032 .edge_with_capacity(
4033 "source",
4034 "out",
4035 "sink",
4036 "in",
4037 NonZeroUsize::new(1).expect("nonzero"),
4038 )
4039 .build();
4040 let execution: ExecutionMetadata = execution_metadata("run-1");
4041 let executor: BoundedBackpressureExecutor = BoundedBackpressureExecutor::default();
4042
4043 block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
4044
4045 let events: Vec<String> = executor.events();
4046 assert!(
4047 events
4048 .iter()
4049 .any(|event: &String| event == "source-observed-full-edge")
4050 );
4051 assert!(
4052 events
4053 .iter()
4054 .any(|event: &String| event == "source-second-send-completed")
4055 );
4056 assert_eq!(
4057 executor.received_payloads(),
4058 vec![b"first".to_vec(), b"second".to_vec()]
4059 );
4060 }
4061
4062 #[test]
4063 fn run_workflow_fans_out_one_output_to_all_downstream_inputs() {
4064 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
4065 .node(NodeBuilder::new("source").output("out").build())
4066 .node(NodeBuilder::new("left").input("in").build())
4067 .node(NodeBuilder::new("right").input("in").build())
4068 .edge("source", "out", "left", "in")
4069 .edge("source", "out", "right", "in")
4070 .build();
4071 let execution: ExecutionMetadata = execution_metadata("run-1");
4072 let executor: FanOutExecutor = FanOutExecutor::default();
4073
4074 block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
4075
4076 let received_by_node: BTreeMap<String, Vec<Vec<u8>>> = executor.received_by_node();
4077 assert_eq!(received_by_node.get("left"), Some(&vec![b"fan".to_vec()]));
4078 assert_eq!(received_by_node.get("right"), Some(&vec![b"fan".to_vec()]));
4079 }
4080
4081 #[test]
4082 fn run_workflow_fans_in_and_propagates_upstream_closure() {
4083 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
4084 .node(NodeBuilder::new("left").output("out").build())
4085 .node(NodeBuilder::new("right").output("out").build())
4086 .node(NodeBuilder::new("collector").input("in").build())
4087 .edge("left", "out", "collector", "in")
4088 .edge("right", "out", "collector", "in")
4089 .build();
4090 let execution: ExecutionMetadata = execution_metadata("run-1");
4091 let executor: FanInClosureExecutor = FanInClosureExecutor::default();
4092
4093 block_on(run_workflow(&workflow, &execution, &executor)).expect("workflow should run");
4094
4095 let mut received: Vec<Vec<u8>> = executor.received_payloads();
4096 received.sort();
4097 assert_eq!(received, vec![b"left".to_vec(), b"right".to_vec()]);
4098 assert!(executor.closure_observed());
4099 }
4100
4101 #[test]
4102 fn run_workflow_aggregates_terminal_results_after_polling_all_nodes() {
4103 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
4104 .node(NodeBuilder::new("first").build())
4105 .node(NodeBuilder::new("second").build())
4106 .build();
4107 let execution: ExecutionMetadata = execution_metadata("run-1");
4108 let executor: AggregateFailureExecutor = AggregateFailureExecutor::default();
4109
4110 let err = block_on(run_workflow(&workflow, &execution, &executor))
4111 .expect_err("workflow should surface executor failures");
4112
4113 assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
4114 assert_eq!(
4115 executor.visited_node_names(),
4116 vec![String::from("first"), String::from("second")]
4117 );
4118 }
4119
4120 #[test]
4121 fn run_workflow_cancels_siblings_after_first_node_failure() {
4122 let workflow: WorkflowDefinition = WorkflowBuilder::new("flow")
4123 .node(NodeBuilder::new("worker").input("in").build())
4124 .node(NodeBuilder::new("fail").output("out").build())
4125 .edge("fail", "out", "worker", "in")
4126 .build();
4127 let execution: ExecutionMetadata = execution_metadata("run-1");
4128 let executor: SiblingCancellationExecutor = SiblingCancellationExecutor::default();
4129
4130 let err = block_on(run_workflow(&workflow, &execution, &executor))
4131 .expect_err("workflow should surface the first node failure");
4132
4133 assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
4134 assert!(executor.cancellation_observed());
4135 }
4136}