1use std::sync::Arc;
9
10use actionqueue_executor_local::handler::{ExecutorContext, HandlerOutput};
11use serde::{Deserialize, Serialize};
12use worldinterface_connector::execute_transform;
13use worldinterface_connector::invoke_with_receipt;
14use worldinterface_connector::{
15 CancellationToken, ConnectorError, ConnectorRegistry, InvocationContext,
16};
17use worldinterface_contextstore::{AtomicWriter, ContextStore, ContextStoreError};
18use worldinterface_core::flowspec::branch::BranchNode;
19use worldinterface_core::flowspec::{ConnectorNode, NodeType, TransformNode};
20use worldinterface_core::id::NodeId;
21use worldinterface_core::metrics::MetricsRecorder;
22use worldinterface_flowspec::id::derive_step_run_id;
23use worldinterface_flowspec::payload::StepPayload;
24
25use crate::error::{ResolveError, StepError};
26use crate::resolve::resolve_params;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct BranchResult {
31 pub taken: bool,
33 pub then_target: NodeId,
35 pub else_target: Option<NodeId>,
37}
38
39pub fn execute_step<S: ContextStore>(
43 ctx: &ExecutorContext,
44 payload: &StepPayload,
45 registry: &ConnectorRegistry,
46 store: &Arc<S>,
47 metrics: &dyn MetricsRecorder,
48) -> HandlerOutput {
49 match step_inner(ctx, payload, registry, store, metrics) {
50 Ok(output) => output,
51 Err(e) => map_step_error(e),
52 }
53}
54
55fn step_inner<S: ContextStore>(
56 ctx: &ExecutorContext,
57 payload: &StepPayload,
58 registry: &ConnectorRegistry,
59 store: &Arc<S>,
60 metrics: &dyn MetricsRecorder,
61) -> Result<HandlerOutput, StepError> {
62 let step_type = match &payload.node_type {
63 NodeType::Connector(_) => "connector",
64 NodeType::Transform(_) => "transform",
65 NodeType::Branch(_) => "branch",
66 };
67 let _span = tracing::info_span!(
68 "step_run",
69 flow_run_id = %payload.flow_run_id,
70 node_id = %payload.node_id,
71 step_type = step_type,
72 )
73 .entered();
74
75 match &payload.node_type {
76 NodeType::Connector(connector_node) => {
77 execute_connector_step(ctx, payload, connector_node, registry, store, metrics)
78 }
79 NodeType::Transform(transform_node) => {
80 execute_transform_step(payload, transform_node, store, metrics)
81 }
82 NodeType::Branch(branch_node) => execute_branch_step(payload, branch_node, store, metrics),
83 }
84}
85
86fn execute_connector_step<S: ContextStore>(
88 ctx: &ExecutorContext,
89 payload: &StepPayload,
90 connector_node: &ConnectorNode,
91 registry: &ConnectorRegistry,
92 store: &Arc<S>,
93 metrics: &dyn MetricsRecorder,
94) -> Result<HandlerOutput, StepError> {
95 if let Some(_existing) = store.get(payload.flow_run_id, payload.node_id)? {
97 tracing::info!(
98 %payload.flow_run_id, %payload.node_id,
99 "output already exists in ContextStore (idempotent retry), skipping connector invocation"
100 );
101 return Ok(HandlerOutput::success());
102 }
103
104 let connector = registry
106 .get(&connector_node.connector)
107 .ok_or_else(|| StepError::ConnectorNotFound { name: connector_node.connector.clone() })?;
108
109 let resolved_params = resolve_params(
111 &connector_node.params,
112 payload.flow_run_id,
113 payload.flow_params.as_ref(),
114 store.as_ref(),
115 )?;
116
117 let inv_ctx = build_invocation_context(ctx, payload);
119
120 metrics.record_connector_invocation(&connector_node.connector);
122
123 let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, &resolved_params);
125
126 match result {
127 Ok(output) => {
128 let writer = AtomicWriter::new(Arc::clone(store));
130 writer
131 .write_and_complete(payload.flow_run_id, payload.node_id, &output, || Ok(()))
132 .map_err(|e| match e {
133 worldinterface_contextstore::AtomicWriteError::WriteFailed(store_err) => {
134 StepError::ContextStoreError(store_err)
135 }
136 worldinterface_contextstore::AtomicWriteError::CompletionFailed(e) => {
137 StepError::ContextStoreError(ContextStoreError::StorageError(e.to_string()))
138 }
139 })?;
140
141 metrics.record_contextstore_write();
143
144 let receipt_key = format!("receipt:step:{}:{}", payload.flow_run_id, payload.node_id);
146 if let Ok(receipt_value) = serde_json::to_value(&receipt) {
147 let _ = store.upsert_global(&receipt_key, &receipt_value);
148 }
149
150 metrics.record_step_completed(
152 &connector_node.connector,
153 receipt.duration_ms as f64 / 1000.0,
154 );
155
156 Ok(HandlerOutput::success())
157 }
158 Err(e) => {
159 let receipt_key = format!("receipt:step:{}:{}", payload.flow_run_id, payload.node_id);
162 if let Ok(receipt_value) = serde_json::to_value(&receipt) {
163 let _ = store.upsert_global(&receipt_key, &receipt_value);
164 }
165
166 metrics.record_step_failed(&connector_node.connector);
168
169 Err(StepError::ConnectorError(e))
170 }
171 }
172}
173
174fn execute_transform_step<S: ContextStore>(
176 payload: &StepPayload,
177 transform_node: &TransformNode,
178 store: &Arc<S>,
179 metrics: &dyn MetricsRecorder,
180) -> Result<HandlerOutput, StepError> {
181 if store.get(payload.flow_run_id, payload.node_id)?.is_some() {
183 tracing::info!(
184 %payload.flow_run_id, %payload.node_id,
185 "transform output already exists (idempotent retry), skipping"
186 );
187 return Ok(HandlerOutput::success());
188 }
189
190 let resolved_input = resolve_params(
192 &transform_node.input,
193 payload.flow_run_id,
194 payload.flow_params.as_ref(),
195 store.as_ref(),
196 )?;
197
198 let output = execute_transform(&transform_node.transform, &resolved_input)?;
200
201 store.put(payload.flow_run_id, payload.node_id, &output).or_else(|e| match e {
203 ContextStoreError::AlreadyExists { .. } => Ok(()),
204 other => Err(other),
205 })?;
206
207 metrics.record_contextstore_write();
209
210 Ok(HandlerOutput::success())
211}
212
213fn execute_branch_step<S: ContextStore>(
215 payload: &StepPayload,
216 branch_node: &BranchNode,
217 store: &Arc<S>,
218 metrics: &dyn MetricsRecorder,
219) -> Result<HandlerOutput, StepError> {
220 if store.get(payload.flow_run_id, payload.node_id)?.is_some() {
222 tracing::info!(
223 %payload.flow_run_id, %payload.node_id,
224 "branch result already exists (idempotent retry), skipping"
225 );
226 return Ok(HandlerOutput::success());
227 }
228
229 let taken = crate::branch_eval::evaluate_branch(
231 &branch_node.condition,
232 payload.flow_run_id,
233 payload.flow_params.as_ref(),
234 store.as_ref(),
235 )?;
236
237 let branch_result = BranchResult {
238 taken,
239 then_target: branch_node.then_edge,
240 else_target: branch_node.else_edge,
241 };
242
243 let value = serde_json::to_value(&branch_result).map_err(|e| {
245 StepError::ContextStoreError(ContextStoreError::StorageError(e.to_string()))
246 })?;
247
248 store.put(payload.flow_run_id, payload.node_id, &value).or_else(|e| match e {
249 ContextStoreError::AlreadyExists { .. } => Ok(()),
250 other => Err(other),
251 })?;
252
253 metrics.record_contextstore_write();
255
256 Ok(HandlerOutput::success())
257}
258
259fn build_invocation_context(ctx: &ExecutorContext, payload: &StepPayload) -> InvocationContext {
261 let step_run_id = derive_step_run_id(payload.flow_run_id, payload.node_id);
262
263 let cancellation =
267 CancellationToken::from_flag(ctx.input.cancellation_context.token().cancelled_flag());
268
269 InvocationContext {
270 flow_run_id: payload.flow_run_id,
271 node_id: payload.node_id,
272 step_run_id,
273 run_id: *ctx.input.run_id.as_uuid(),
274 attempt_id: *ctx.input.attempt_id.as_uuid(),
275 attempt_number: ctx.input.metadata.attempt_number,
276 cancellation,
277 }
278}
279
280fn map_step_error(err: StepError) -> HandlerOutput {
282 match &err {
283 StepError::ConnectorError(ConnectorError::Retryable(_)) => {
284 HandlerOutput::retryable_failure(err.to_string())
285 }
286 StepError::ConnectorError(ConnectorError::Cancelled) => {
287 HandlerOutput::retryable_failure(err.to_string())
288 }
289 StepError::ConnectorError(ConnectorError::Terminal(_))
290 | StepError::ConnectorError(ConnectorError::InvalidParams(_)) => {
291 HandlerOutput::terminal_failure(err.to_string())
292 }
293 StepError::ResolveFailed(ResolveError::NodeOutputNotFound { .. }) => {
294 HandlerOutput::retryable_failure(err.to_string())
295 }
296 StepError::ResolveFailed(ResolveError::ContextStoreError(
297 ContextStoreError::StorageError(_),
298 )) => HandlerOutput::retryable_failure(err.to_string()),
299 StepError::ContextStoreError(ContextStoreError::StorageError(_)) => {
300 HandlerOutput::retryable_failure(err.to_string())
301 }
302 _ => HandlerOutput::terminal_failure(err.to_string()),
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use actionqueue_core::ids::{AttemptId, RunId};
309 use actionqueue_core::task::safety::SafetyLevel;
310 use actionqueue_executor_local::handler::{AttemptMetadata, CancellationContext, HandlerInput};
311 use serde_json::json;
312 use worldinterface_connector::connectors::default_registry;
313 use worldinterface_contextstore::SqliteContextStore;
314 use worldinterface_core::flowspec::branch::{BranchCondition, ParamRef};
315 use worldinterface_core::flowspec::transform::TransformType;
316 use worldinterface_core::id::FlowRunId;
317 use worldinterface_core::metrics::NoopMetricsRecorder;
318
319 use super::*;
320
321 static NOOP_METRICS: NoopMetricsRecorder = NoopMetricsRecorder;
322
323 fn make_store() -> Arc<SqliteContextStore> {
324 Arc::new(SqliteContextStore::in_memory().unwrap())
325 }
326
327 fn make_executor_context() -> ExecutorContext {
328 ExecutorContext {
329 input: HandlerInput {
330 run_id: RunId::new(),
331 attempt_id: AttemptId::new(),
332 payload: vec![],
333 metadata: AttemptMetadata {
334 max_attempts: 3,
335 attempt_number: 1,
336 timeout_secs: None,
337 safety_level: SafetyLevel::Idempotent,
338 },
339 cancellation_context: CancellationContext::new(),
340 },
341 submission: None,
342 children: None,
343 }
344 }
345
346 fn make_step_payload(
347 flow_run_id: FlowRunId,
348 node_id: NodeId,
349 node_type: NodeType,
350 ) -> StepPayload {
351 StepPayload {
352 task_type: worldinterface_flowspec::payload::TaskType::Step,
353 flow_run_id,
354 node_id,
355 node_type,
356 flow_params: None,
357 }
358 }
359
360 #[test]
363 fn step_invokes_delay_connector() {
364 let store = make_store();
365 let registry = default_registry();
366 let ctx = make_executor_context();
367 let fr = FlowRunId::new();
368 let node_id = NodeId::new();
369
370 let payload = make_step_payload(
371 fr,
372 node_id,
373 NodeType::Connector(ConnectorNode {
374 connector: "delay".into(),
375 params: json!({"duration_ms": 10}),
376 idempotency_config: None,
377 }),
378 );
379
380 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
381 assert!(matches!(result, HandlerOutput::Success { .. }));
382 assert!(store.get(fr, node_id).unwrap().is_some());
383 }
384
385 #[test]
386 fn step_invokes_fs_write_connector() {
387 let dir = tempfile::tempdir().unwrap();
388 let file_path = dir.path().join("test.txt");
389 let store = make_store();
390 let registry = default_registry();
391 let ctx = make_executor_context();
392 let fr = FlowRunId::new();
393 let node_id = NodeId::new();
394
395 let payload = make_step_payload(
396 fr,
397 node_id,
398 NodeType::Connector(ConnectorNode {
399 connector: "fs.write".into(),
400 params: json!({
401 "path": file_path.to_str().unwrap(),
402 "content": "hello world"
403 }),
404 idempotency_config: None,
405 }),
406 );
407
408 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
409 assert!(matches!(result, HandlerOutput::Success { .. }));
410 assert_eq!(std::fs::read_to_string(&file_path).unwrap(), "hello world");
411 assert!(store.get(fr, node_id).unwrap().is_some());
412 }
413
414 #[test]
415 fn step_connector_not_found() {
416 let store = make_store();
417 let registry = default_registry();
418 let ctx = make_executor_context();
419 let fr = FlowRunId::new();
420 let node_id = NodeId::new();
421
422 let payload = make_step_payload(
423 fr,
424 node_id,
425 NodeType::Connector(ConnectorNode {
426 connector: "nonexistent".into(),
427 params: json!({}),
428 idempotency_config: None,
429 }),
430 );
431
432 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
433 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
434 }
435
436 #[test]
437 fn step_connector_terminal_error() {
438 let dir = tempfile::tempdir().unwrap();
439 let file_path = dir.path().join("test.txt");
440 std::fs::write(&file_path, "existing").unwrap();
442
443 let store = make_store();
444 let registry = default_registry();
445 let ctx = make_executor_context();
446 let fr = FlowRunId::new();
447 let node_id = NodeId::new();
448
449 let payload = make_step_payload(
450 fr,
451 node_id,
452 NodeType::Connector(ConnectorNode {
453 connector: "fs.write".into(),
454 params: json!({
455 "path": file_path.to_str().unwrap(),
456 "content": "new",
457 "mode": "create"
458 }),
459 idempotency_config: None,
460 }),
461 );
462
463 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
464 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
465 }
466
467 #[test]
468 fn step_connector_invalid_params() {
469 let store = make_store();
470 let registry = default_registry();
471 let ctx = make_executor_context();
472 let fr = FlowRunId::new();
473 let node_id = NodeId::new();
474
475 let payload = make_step_payload(
476 fr,
477 node_id,
478 NodeType::Connector(ConnectorNode {
479 connector: "delay".into(),
480 params: json!({}), idempotency_config: None,
482 }),
483 );
484
485 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
486 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
487 }
488
489 #[test]
492 fn step_executes_identity_transform() {
493 let store = make_store();
494 let registry = default_registry();
495 let ctx = make_executor_context();
496 let fr = FlowRunId::new();
497 let node_id = NodeId::new();
498
499 let input_val = json!({"key": "value", "num": 42});
500 let payload = make_step_payload(
501 fr,
502 node_id,
503 NodeType::Transform(TransformNode {
504 transform: TransformType::Identity,
505 input: input_val.clone(),
506 }),
507 );
508
509 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
510 assert!(matches!(result, HandlerOutput::Success { .. }));
511 let stored = store.get(fr, node_id).unwrap().unwrap();
512 assert_eq!(stored, input_val);
513 }
514
515 #[test]
516 fn step_executes_field_mapping() {
517 use worldinterface_core::flowspec::transform::{FieldMapping, FieldMappingSpec};
518
519 let store = make_store();
520 let registry = default_registry();
521 let ctx = make_executor_context();
522 let fr = FlowRunId::new();
523 let node_id = NodeId::new();
524
525 let payload = make_step_payload(
526 fr,
527 node_id,
528 NodeType::Transform(TransformNode {
529 transform: TransformType::FieldMapping(FieldMappingSpec {
530 mappings: vec![FieldMapping { from: "a".into(), to: "b".into() }],
531 }),
532 input: json!({"a": 1}),
533 }),
534 );
535
536 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
537 assert!(matches!(result, HandlerOutput::Success { .. }));
538 let stored = store.get(fr, node_id).unwrap().unwrap();
539 assert_eq!(stored, json!({"b": 1}));
540 }
541
542 #[test]
543 fn step_transform_error_is_terminal() {
544 use worldinterface_core::flowspec::transform::{FieldMapping, FieldMappingSpec};
545
546 let store = make_store();
547 let registry = default_registry();
548 let ctx = make_executor_context();
549 let fr = FlowRunId::new();
550 let node_id = NodeId::new();
551
552 let payload = make_step_payload(
553 fr,
554 node_id,
555 NodeType::Transform(TransformNode {
556 transform: TransformType::FieldMapping(FieldMappingSpec {
557 mappings: vec![FieldMapping { from: "nonexistent".into(), to: "out".into() }],
558 }),
559 input: json!({"a": 1}),
560 }),
561 );
562
563 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
564 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
565 }
566
567 #[test]
570 fn step_evaluates_exists_true() {
571 let store = make_store();
572 let registry = default_registry();
573 let ctx = make_executor_context();
574 let fr = FlowRunId::new();
575 let node_id = NodeId::new();
576 let then_target = NodeId::new();
577
578 let mut payload = make_step_payload(
579 fr,
580 node_id,
581 NodeType::Branch(BranchNode {
582 condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
583 then_edge: then_target,
584 else_edge: None,
585 }),
586 );
587 payload.flow_params = Some(json!({"flag": "present"}));
588
589 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
590 assert!(matches!(result, HandlerOutput::Success { .. }));
591
592 let stored = store.get(fr, node_id).unwrap().unwrap();
593 let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
594 assert!(branch_result.taken);
595 assert_eq!(branch_result.then_target, then_target);
596 }
597
598 #[test]
599 fn step_evaluates_exists_false() {
600 let store = make_store();
601 let registry = default_registry();
602 let ctx = make_executor_context();
603 let fr = FlowRunId::new();
604 let node_id = NodeId::new();
605 let then_target = NodeId::new();
606 let else_target = NodeId::new();
607
608 let mut payload = make_step_payload(
609 fr,
610 node_id,
611 NodeType::Branch(BranchNode {
612 condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
613 then_edge: then_target,
614 else_edge: Some(else_target),
615 }),
616 );
617 payload.flow_params = Some(json!({"flag": null}));
618
619 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
620 assert!(matches!(result, HandlerOutput::Success { .. }));
621
622 let stored = store.get(fr, node_id).unwrap().unwrap();
623 let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
624 assert!(!branch_result.taken);
625 }
626
627 #[test]
628 fn step_evaluates_equals_true() {
629 let store = make_store();
630 let registry = default_registry();
631 let ctx = make_executor_context();
632 let fr = FlowRunId::new();
633 let node_id = NodeId::new();
634 let then_target = NodeId::new();
635
636 let mut payload = make_step_payload(
637 fr,
638 node_id,
639 NodeType::Branch(BranchNode {
640 condition: BranchCondition::Equals {
641 left: ParamRef::FlowParam { path: "status".into() },
642 right: json!("ok"),
643 },
644 then_edge: then_target,
645 else_edge: None,
646 }),
647 );
648 payload.flow_params = Some(json!({"status": "ok"}));
649
650 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
651 assert!(matches!(result, HandlerOutput::Success { .. }));
652
653 let stored = store.get(fr, node_id).unwrap().unwrap();
654 let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
655 assert!(branch_result.taken);
656 }
657
658 #[test]
659 fn step_expression_not_implemented() {
660 let store = make_store();
661 let registry = default_registry();
662 let ctx = make_executor_context();
663 let fr = FlowRunId::new();
664 let node_id = NodeId::new();
665 let then_target = NodeId::new();
666
667 let payload = make_step_payload(
668 fr,
669 node_id,
670 NodeType::Branch(BranchNode {
671 condition: BranchCondition::Expression("1 + 1".into()),
672 then_edge: then_target,
673 else_edge: None,
674 }),
675 );
676
677 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
678 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
679 }
680
681 #[test]
684 fn step_skips_invocation_if_output_exists() {
685 let store = make_store();
686 let registry = default_registry();
687 let ctx = make_executor_context();
688 let fr = FlowRunId::new();
689 let node_id = NodeId::new();
690
691 store.put(fr, node_id, &json!({"pre": "existing"})).unwrap();
693
694 let payload = make_step_payload(
695 fr,
696 node_id,
697 NodeType::Connector(ConnectorNode {
698 connector: "delay".into(),
699 params: json!({"duration_ms": 10}),
700 idempotency_config: None,
701 }),
702 );
703
704 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
705 assert!(matches!(result, HandlerOutput::Success { .. }));
706 let stored = store.get(fr, node_id).unwrap().unwrap();
708 assert_eq!(stored, json!({"pre": "existing"}));
709 }
710
711 #[test]
714 fn context_has_correct_ids() {
715 let ctx = make_executor_context();
716 let fr = FlowRunId::new();
717 let node_id = NodeId::new();
718 let payload = make_step_payload(
719 fr,
720 node_id,
721 NodeType::Connector(ConnectorNode {
722 connector: "delay".into(),
723 params: json!({"duration_ms": 0}),
724 idempotency_config: None,
725 }),
726 );
727
728 let inv_ctx = build_invocation_context(&ctx, &payload);
729 assert_eq!(inv_ctx.flow_run_id, fr);
730 assert_eq!(inv_ctx.node_id, node_id);
731 assert_eq!(inv_ctx.run_id, *ctx.input.run_id.as_uuid());
732 assert_eq!(inv_ctx.attempt_id, *ctx.input.attempt_id.as_uuid());
733 assert_eq!(inv_ctx.attempt_number, 1);
734 }
735
736 #[test]
737 fn cancellation_token_bridges_correctly() {
738 let ctx = {
739 let c = make_executor_context();
740 c.input.cancellation_context.cancel();
741 c
742 };
743
744 let payload = make_step_payload(
745 FlowRunId::new(),
746 NodeId::new(),
747 NodeType::Connector(ConnectorNode {
748 connector: "delay".into(),
749 params: json!({}),
750 idempotency_config: None,
751 }),
752 );
753
754 let inv_ctx = build_invocation_context(&ctx, &payload);
755 assert!(inv_ctx.cancellation.is_cancelled());
756 }
757
758 #[test]
761 fn connector_step_produces_receipt() {
762 let registry = default_registry();
765 let connector = registry.get("delay").unwrap();
766 let inv_ctx = InvocationContext {
767 flow_run_id: FlowRunId::new(),
768 node_id: NodeId::new(),
769 step_run_id: worldinterface_flowspec::id::derive_step_run_id(
770 FlowRunId::new(),
771 NodeId::new(),
772 ),
773 run_id: uuid::Uuid::new_v4(),
774 attempt_id: uuid::Uuid::new_v4(),
775 attempt_number: 1,
776 cancellation: CancellationToken::new(),
777 };
778 let params = json!({"duration_ms": 10});
779 let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, ¶ms);
780 assert!(result.is_ok());
781 assert_eq!(receipt.status, worldinterface_core::receipt::ReceiptStatus::Success);
782 assert_eq!(receipt.connector, "delay");
783 }
784
785 #[test]
786 fn connector_step_produces_receipt_on_failure() {
787 let registry = default_registry();
788 let connector = registry.get("delay").unwrap();
789 let inv_ctx = InvocationContext {
790 flow_run_id: FlowRunId::new(),
791 node_id: NodeId::new(),
792 step_run_id: worldinterface_flowspec::id::derive_step_run_id(
793 FlowRunId::new(),
794 NodeId::new(),
795 ),
796 run_id: uuid::Uuid::new_v4(),
797 attempt_id: uuid::Uuid::new_v4(),
798 attempt_number: 1,
799 cancellation: CancellationToken::new(),
800 };
801 let params = json!({}); let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, ¶ms);
803 assert!(result.is_err());
804 assert_eq!(receipt.status, worldinterface_core::receipt::ReceiptStatus::Failure);
805 assert_eq!(receipt.connector, "delay");
806 assert!(receipt.error.is_some());
807 }
808
809 #[test]
810 fn transform_step_does_not_produce_receipt() {
811 let store = make_store();
815 let registry = default_registry();
816 let ctx = make_executor_context();
817 let fr = FlowRunId::new();
818 let node_id = NodeId::new();
819
820 let payload = make_step_payload(
821 fr,
822 node_id,
823 NodeType::Transform(TransformNode {
824 transform: TransformType::Identity,
825 input: json!({"key": "value"}),
826 }),
827 );
828
829 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
830 assert!(matches!(result, HandlerOutput::Success { .. }));
831 assert!(store.get(fr, node_id).unwrap().is_some());
835 }
836
837 #[test]
840 fn connector_step_stores_receipt_on_success() {
841 let store = make_store();
842 let registry = default_registry();
843 let ctx = make_executor_context();
844 let fr = FlowRunId::new();
845 let node_id = NodeId::new();
846
847 let payload = make_step_payload(
848 fr,
849 node_id,
850 NodeType::Connector(ConnectorNode {
851 connector: "delay".into(),
852 params: json!({"duration_ms": 10}),
853 idempotency_config: None,
854 }),
855 );
856
857 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
858 assert!(matches!(result, HandlerOutput::Success { .. }));
859
860 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
862 let receipt = store.get_global(&receipt_key).unwrap();
863 assert!(receipt.is_some(), "receipt should be stored on success");
864 let receipt = receipt.unwrap();
865 assert_eq!(receipt.get("connector").and_then(|v| v.as_str()), Some("delay"));
866 assert_eq!(receipt.get("status").and_then(|v| v.as_str()), Some("success"));
867 }
868
869 #[test]
870 fn connector_step_stores_receipt_on_failure() {
871 let store = make_store();
872 let registry = default_registry();
873 let ctx = make_executor_context();
874 let fr = FlowRunId::new();
875 let node_id = NodeId::new();
876
877 let payload = make_step_payload(
878 fr,
879 node_id,
880 NodeType::Connector(ConnectorNode {
881 connector: "delay".into(),
882 params: json!({}), idempotency_config: None,
884 }),
885 );
886
887 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
888 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
889
890 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
892 let receipt = store.get_global(&receipt_key).unwrap();
893 assert!(receipt.is_some(), "receipt should be stored even on failure");
894 let receipt = receipt.unwrap();
895 assert_eq!(receipt.get("status").and_then(|v| v.as_str()), Some("failure"));
896 assert!(receipt.get("error").is_some(), "failure receipt should have error field");
897 }
898
899 #[test]
900 fn receipt_has_correct_flow_run_id() {
901 let store = make_store();
902 let registry = default_registry();
903 let ctx = make_executor_context();
904 let fr = FlowRunId::new();
905 let node_id = NodeId::new();
906
907 let payload = make_step_payload(
908 fr,
909 node_id,
910 NodeType::Connector(ConnectorNode {
911 connector: "delay".into(),
912 params: json!({"duration_ms": 10}),
913 idempotency_config: None,
914 }),
915 );
916
917 execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
918
919 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
920 let receipt = store.get_global(&receipt_key).unwrap().unwrap();
921 assert_eq!(
922 receipt.get("flow_run_id").and_then(|v| v.as_str()),
923 Some(fr.to_string()).as_deref()
924 );
925 }
926
927 #[test]
928 fn receipt_has_nonzero_duration() {
929 let store = make_store();
930 let registry = default_registry();
931 let ctx = make_executor_context();
932 let fr = FlowRunId::new();
933 let node_id = NodeId::new();
934
935 let payload = make_step_payload(
936 fr,
937 node_id,
938 NodeType::Connector(ConnectorNode {
939 connector: "delay".into(),
940 params: json!({"duration_ms": 50}),
941 idempotency_config: None,
942 }),
943 );
944
945 execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
946
947 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
948 let receipt = store.get_global(&receipt_key).unwrap().unwrap();
949 let duration = receipt.get("duration_ms").and_then(|v| v.as_u64()).unwrap();
950 assert!(duration >= 50, "duration should be >= 50ms, got {}ms", duration);
951 }
952
953 #[test]
954 fn transform_step_does_not_store_receipt() {
955 let store = make_store();
956 let registry = default_registry();
957 let ctx = make_executor_context();
958 let fr = FlowRunId::new();
959 let node_id = NodeId::new();
960
961 let payload = make_step_payload(
962 fr,
963 node_id,
964 NodeType::Transform(TransformNode {
965 transform: TransformType::Identity,
966 input: json!({"key": "value"}),
967 }),
968 );
969
970 execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
971
972 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
974 let receipt = store.get_global(&receipt_key).unwrap();
975 assert!(receipt.is_none(), "transform steps should not produce receipts");
976 }
977
978 #[test]
979 fn branch_step_does_not_produce_receipt() {
980 let store = make_store();
981 let registry = default_registry();
982 let ctx = make_executor_context();
983 let fr = FlowRunId::new();
984 let node_id = NodeId::new();
985 let then_target = NodeId::new();
986
987 let mut payload = make_step_payload(
988 fr,
989 node_id,
990 NodeType::Branch(BranchNode {
991 condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
992 then_edge: then_target,
993 else_edge: None,
994 }),
995 );
996 payload.flow_params = Some(json!({"flag": true}));
997
998 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
999 assert!(matches!(result, HandlerOutput::Success { .. }));
1000 assert!(store.get(fr, node_id).unwrap().is_some());
1004
1005 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
1007 assert!(
1008 store.get_global(&receipt_key).unwrap().is_none(),
1009 "branch steps should not produce receipts"
1010 );
1011 }
1012
1013 use std::io;
1016 use std::sync::Mutex;
1017
1018 use tracing_subscriber::fmt;
1019 use tracing_subscriber::prelude::*;
1020
1021 struct BufWriter(Arc<Mutex<Vec<u8>>>);
1023
1024 impl io::Write for BufWriter {
1025 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1026 self.0.lock().unwrap().extend_from_slice(buf);
1027 Ok(buf.len())
1028 }
1029 fn flush(&mut self) -> io::Result<()> {
1030 Ok(())
1031 }
1032 }
1033
1034 impl Clone for BufWriter {
1035 fn clone(&self) -> Self {
1036 BufWriter(Arc::clone(&self.0))
1037 }
1038 }
1039
1040 impl<'a> fmt::MakeWriter<'a> for BufWriter {
1041 type Writer = BufWriter;
1042 fn make_writer(&'a self) -> Self::Writer {
1043 self.clone()
1044 }
1045 }
1046
1047 fn capture_tracing<F: FnOnce()>(f: F) -> String {
1049 let buf = Arc::new(Mutex::new(Vec::new()));
1050 let writer = BufWriter(Arc::clone(&buf));
1051 let subscriber = tracing_subscriber::registry().with(
1052 fmt::layer()
1053 .with_writer(writer)
1054 .with_ansi(false)
1055 .with_target(false)
1056 .with_level(true)
1057 .with_span_events(fmt::format::FmtSpan::NEW | fmt::format::FmtSpan::CLOSE),
1058 );
1059 tracing::subscriber::with_default(subscriber, f);
1060 let bytes = buf.lock().unwrap().clone();
1061 String::from_utf8(bytes).unwrap()
1062 }
1063
1064 #[test]
1065 fn step_run_span_has_flow_run_id_and_node_id() {
1066 let store = make_store();
1067 let registry = default_registry();
1068 let ctx = make_executor_context();
1069 let fr = FlowRunId::new();
1070 let node_id = NodeId::new();
1071
1072 let payload = make_step_payload(
1073 fr,
1074 node_id,
1075 NodeType::Connector(ConnectorNode {
1076 connector: "delay".into(),
1077 params: json!({"duration_ms": 10}),
1078 idempotency_config: None,
1079 }),
1080 );
1081
1082 let output = capture_tracing(|| {
1083 execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
1084 });
1085
1086 assert!(
1087 output.contains(&fr.to_string()),
1088 "tracing output should contain flow_run_id {}, got:\n{}",
1089 fr,
1090 output
1091 );
1092 assert!(
1093 output.contains(&node_id.to_string()),
1094 "tracing output should contain node_id {}, got:\n{}",
1095 node_id,
1096 output
1097 );
1098 assert!(
1099 output.contains("step_run"),
1100 "tracing output should contain step_run span name, got:\n{}",
1101 output
1102 );
1103 }
1104
1105 #[test]
1106 fn step_run_span_has_step_type() {
1107 let store = make_store();
1108 let registry = default_registry();
1109 let ctx = make_executor_context();
1110 let fr = FlowRunId::new();
1111 let node_id = NodeId::new();
1112
1113 let payload = make_step_payload(
1114 fr,
1115 node_id,
1116 NodeType::Transform(TransformNode {
1117 transform: TransformType::Identity,
1118 input: json!({"x": 1}),
1119 }),
1120 );
1121
1122 let output = capture_tracing(|| {
1123 execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
1124 });
1125
1126 assert!(
1127 output.contains("step_type"),
1128 "tracing output should contain step_type field, got:\n{}",
1129 output
1130 );
1131 assert!(
1132 output.contains("transform"),
1133 "tracing output should contain step_type=transform, got:\n{}",
1134 output
1135 );
1136 }
1137}