1use std::sync::Arc;
9
10use actionqueue_executor_local::handler::{ExecutorContext, HandlerOutput};
11use serde::{Deserialize, Serialize};
12use worldinterface_connector::execute_transform;
13use worldinterface_connector::invoke_with_receipt;
14use worldinterface_connector::{CancellationToken, ConnectorError, ConnectorRegistry, InvocationContext};
15use worldinterface_contextstore::{AtomicWriter, ContextStore, ContextStoreError};
16use worldinterface_core::flowspec::branch::BranchNode;
17use worldinterface_core::flowspec::{ConnectorNode, NodeType, TransformNode};
18use worldinterface_core::id::NodeId;
19use worldinterface_core::metrics::MetricsRecorder;
20use worldinterface_flowspec::id::derive_step_run_id;
21use worldinterface_flowspec::payload::StepPayload;
22
23use crate::error::{ResolveError, StepError};
24use crate::resolve::resolve_params;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct BranchResult {
29 pub taken: bool,
31 pub then_target: NodeId,
33 pub else_target: Option<NodeId>,
35}
36
37pub fn execute_step<S: ContextStore>(
41 ctx: &ExecutorContext,
42 payload: &StepPayload,
43 registry: &ConnectorRegistry,
44 store: &Arc<S>,
45 metrics: &dyn MetricsRecorder,
46) -> HandlerOutput {
47 match step_inner(ctx, payload, registry, store, metrics) {
48 Ok(output) => output,
49 Err(e) => map_step_error(e),
50 }
51}
52
53fn step_inner<S: ContextStore>(
54 ctx: &ExecutorContext,
55 payload: &StepPayload,
56 registry: &ConnectorRegistry,
57 store: &Arc<S>,
58 metrics: &dyn MetricsRecorder,
59) -> Result<HandlerOutput, StepError> {
60 let step_type = match &payload.node_type {
61 NodeType::Connector(_) => "connector",
62 NodeType::Transform(_) => "transform",
63 NodeType::Branch(_) => "branch",
64 };
65 let _span = tracing::info_span!(
66 "step_run",
67 flow_run_id = %payload.flow_run_id,
68 node_id = %payload.node_id,
69 step_type = step_type,
70 )
71 .entered();
72
73 match &payload.node_type {
74 NodeType::Connector(connector_node) => {
75 execute_connector_step(ctx, payload, connector_node, registry, store, metrics)
76 }
77 NodeType::Transform(transform_node) => {
78 execute_transform_step(payload, transform_node, store, metrics)
79 }
80 NodeType::Branch(branch_node) => execute_branch_step(payload, branch_node, store, metrics),
81 }
82}
83
84fn execute_connector_step<S: ContextStore>(
86 ctx: &ExecutorContext,
87 payload: &StepPayload,
88 connector_node: &ConnectorNode,
89 registry: &ConnectorRegistry,
90 store: &Arc<S>,
91 metrics: &dyn MetricsRecorder,
92) -> Result<HandlerOutput, StepError> {
93 if let Some(_existing) = store.get(payload.flow_run_id, payload.node_id)? {
95 tracing::info!(
96 %payload.flow_run_id, %payload.node_id,
97 "output already exists in ContextStore (idempotent retry), skipping connector invocation"
98 );
99 return Ok(HandlerOutput::success());
100 }
101
102 let connector = registry
104 .get(&connector_node.connector)
105 .ok_or_else(|| StepError::ConnectorNotFound { name: connector_node.connector.clone() })?;
106
107 let resolved_params = resolve_params(
109 &connector_node.params,
110 payload.flow_run_id,
111 payload.flow_params.as_ref(),
112 store.as_ref(),
113 )?;
114
115 let inv_ctx = build_invocation_context(ctx, payload);
117
118 metrics.record_connector_invocation(&connector_node.connector);
120
121 let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, &resolved_params);
123
124 match result {
125 Ok(output) => {
126 let writer = AtomicWriter::new(Arc::clone(store));
128 writer
129 .write_and_complete(payload.flow_run_id, payload.node_id, &output, || Ok(()))
130 .map_err(|e| match e {
131 worldinterface_contextstore::AtomicWriteError::WriteFailed(store_err) => {
132 StepError::ContextStoreError(store_err)
133 }
134 worldinterface_contextstore::AtomicWriteError::CompletionFailed(e) => {
135 StepError::ContextStoreError(ContextStoreError::StorageError(e.to_string()))
136 }
137 })?;
138
139 metrics.record_contextstore_write();
141
142 let receipt_key = format!("receipt:step:{}:{}", payload.flow_run_id, payload.node_id);
144 if let Ok(receipt_value) = serde_json::to_value(&receipt) {
145 let _ = store.upsert_global(&receipt_key, &receipt_value);
146 }
147
148 metrics.record_step_completed(
150 &connector_node.connector,
151 receipt.duration_ms as f64 / 1000.0,
152 );
153
154 Ok(HandlerOutput::success())
155 }
156 Err(e) => {
157 let receipt_key = format!("receipt:step:{}:{}", payload.flow_run_id, payload.node_id);
160 if let Ok(receipt_value) = serde_json::to_value(&receipt) {
161 let _ = store.upsert_global(&receipt_key, &receipt_value);
162 }
163
164 metrics.record_step_failed(&connector_node.connector);
166
167 Err(StepError::ConnectorError(e))
168 }
169 }
170}
171
172fn execute_transform_step<S: ContextStore>(
174 payload: &StepPayload,
175 transform_node: &TransformNode,
176 store: &Arc<S>,
177 metrics: &dyn MetricsRecorder,
178) -> Result<HandlerOutput, StepError> {
179 if store.get(payload.flow_run_id, payload.node_id)?.is_some() {
181 tracing::info!(
182 %payload.flow_run_id, %payload.node_id,
183 "transform output already exists (idempotent retry), skipping"
184 );
185 return Ok(HandlerOutput::success());
186 }
187
188 let resolved_input = resolve_params(
190 &transform_node.input,
191 payload.flow_run_id,
192 payload.flow_params.as_ref(),
193 store.as_ref(),
194 )?;
195
196 let output = execute_transform(&transform_node.transform, &resolved_input)?;
198
199 store.put(payload.flow_run_id, payload.node_id, &output).or_else(|e| match e {
201 ContextStoreError::AlreadyExists { .. } => Ok(()),
202 other => Err(other),
203 })?;
204
205 metrics.record_contextstore_write();
207
208 Ok(HandlerOutput::success())
209}
210
211fn execute_branch_step<S: ContextStore>(
213 payload: &StepPayload,
214 branch_node: &BranchNode,
215 store: &Arc<S>,
216 metrics: &dyn MetricsRecorder,
217) -> Result<HandlerOutput, StepError> {
218 if store.get(payload.flow_run_id, payload.node_id)?.is_some() {
220 tracing::info!(
221 %payload.flow_run_id, %payload.node_id,
222 "branch result already exists (idempotent retry), skipping"
223 );
224 return Ok(HandlerOutput::success());
225 }
226
227 let taken = crate::branch_eval::evaluate_branch(
229 &branch_node.condition,
230 payload.flow_run_id,
231 payload.flow_params.as_ref(),
232 store.as_ref(),
233 )?;
234
235 let branch_result = BranchResult {
236 taken,
237 then_target: branch_node.then_edge,
238 else_target: branch_node.else_edge,
239 };
240
241 let value = serde_json::to_value(&branch_result).map_err(|e| {
243 StepError::ContextStoreError(ContextStoreError::StorageError(e.to_string()))
244 })?;
245
246 store.put(payload.flow_run_id, payload.node_id, &value).or_else(|e| match e {
247 ContextStoreError::AlreadyExists { .. } => Ok(()),
248 other => Err(other),
249 })?;
250
251 metrics.record_contextstore_write();
253
254 Ok(HandlerOutput::success())
255}
256
257fn build_invocation_context(ctx: &ExecutorContext, payload: &StepPayload) -> InvocationContext {
259 let step_run_id = derive_step_run_id(payload.flow_run_id, payload.node_id);
260
261 let cancellation =
265 CancellationToken::from_flag(ctx.input.cancellation_context.token().cancelled_flag());
266
267 InvocationContext {
268 flow_run_id: payload.flow_run_id,
269 node_id: payload.node_id,
270 step_run_id,
271 run_id: *ctx.input.run_id.as_uuid(),
272 attempt_id: *ctx.input.attempt_id.as_uuid(),
273 attempt_number: ctx.input.metadata.attempt_number,
274 cancellation,
275 }
276}
277
278fn map_step_error(err: StepError) -> HandlerOutput {
280 match &err {
281 StepError::ConnectorError(ConnectorError::Retryable(_)) => {
282 HandlerOutput::retryable_failure(err.to_string())
283 }
284 StepError::ConnectorError(ConnectorError::Cancelled) => {
285 HandlerOutput::retryable_failure(err.to_string())
286 }
287 StepError::ConnectorError(ConnectorError::Terminal(_))
288 | StepError::ConnectorError(ConnectorError::InvalidParams(_)) => {
289 HandlerOutput::terminal_failure(err.to_string())
290 }
291 StepError::ResolveFailed(ResolveError::NodeOutputNotFound { .. }) => {
292 HandlerOutput::retryable_failure(err.to_string())
293 }
294 StepError::ResolveFailed(ResolveError::ContextStoreError(
295 ContextStoreError::StorageError(_),
296 )) => HandlerOutput::retryable_failure(err.to_string()),
297 StepError::ContextStoreError(ContextStoreError::StorageError(_)) => {
298 HandlerOutput::retryable_failure(err.to_string())
299 }
300 _ => HandlerOutput::terminal_failure(err.to_string()),
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use actionqueue_core::ids::{AttemptId, RunId};
307 use actionqueue_core::task::safety::SafetyLevel;
308 use actionqueue_executor_local::handler::{AttemptMetadata, CancellationContext, HandlerInput};
309 use serde_json::json;
310 use worldinterface_connector::connectors::default_registry;
311 use worldinterface_contextstore::SqliteContextStore;
312 use worldinterface_core::flowspec::branch::{BranchCondition, ParamRef};
313 use worldinterface_core::flowspec::transform::TransformType;
314 use worldinterface_core::id::FlowRunId;
315 use worldinterface_core::metrics::NoopMetricsRecorder;
316
317 use super::*;
318
319 static NOOP_METRICS: NoopMetricsRecorder = NoopMetricsRecorder;
320
321 fn make_store() -> Arc<SqliteContextStore> {
322 Arc::new(SqliteContextStore::in_memory().unwrap())
323 }
324
325 fn make_executor_context() -> ExecutorContext {
326 ExecutorContext {
327 input: HandlerInput {
328 run_id: RunId::new(),
329 attempt_id: AttemptId::new(),
330 payload: vec![],
331 metadata: AttemptMetadata {
332 max_attempts: 3,
333 attempt_number: 1,
334 timeout_secs: None,
335 safety_level: SafetyLevel::Idempotent,
336 },
337 cancellation_context: CancellationContext::new(),
338 },
339 submission: None,
340 children: None,
341 }
342 }
343
344 fn make_step_payload(
345 flow_run_id: FlowRunId,
346 node_id: NodeId,
347 node_type: NodeType,
348 ) -> StepPayload {
349 StepPayload {
350 task_type: worldinterface_flowspec::payload::TaskType::Step,
351 flow_run_id,
352 node_id,
353 node_type,
354 flow_params: None,
355 }
356 }
357
358 #[test]
361 fn step_invokes_delay_connector() {
362 let store = make_store();
363 let registry = default_registry();
364 let ctx = make_executor_context();
365 let fr = FlowRunId::new();
366 let node_id = NodeId::new();
367
368 let payload = make_step_payload(
369 fr,
370 node_id,
371 NodeType::Connector(ConnectorNode {
372 connector: "delay".into(),
373 params: json!({"duration_ms": 10}),
374 idempotency_config: None,
375 }),
376 );
377
378 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
379 assert!(matches!(result, HandlerOutput::Success { .. }));
380 assert!(store.get(fr, node_id).unwrap().is_some());
381 }
382
383 #[test]
384 fn step_invokes_fs_write_connector() {
385 let dir = tempfile::tempdir().unwrap();
386 let file_path = dir.path().join("test.txt");
387 let store = make_store();
388 let registry = default_registry();
389 let ctx = make_executor_context();
390 let fr = FlowRunId::new();
391 let node_id = NodeId::new();
392
393 let payload = make_step_payload(
394 fr,
395 node_id,
396 NodeType::Connector(ConnectorNode {
397 connector: "fs.write".into(),
398 params: json!({
399 "path": file_path.to_str().unwrap(),
400 "content": "hello world"
401 }),
402 idempotency_config: None,
403 }),
404 );
405
406 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
407 assert!(matches!(result, HandlerOutput::Success { .. }));
408 assert_eq!(std::fs::read_to_string(&file_path).unwrap(), "hello world");
409 assert!(store.get(fr, node_id).unwrap().is_some());
410 }
411
412 #[test]
413 fn step_connector_not_found() {
414 let store = make_store();
415 let registry = default_registry();
416 let ctx = make_executor_context();
417 let fr = FlowRunId::new();
418 let node_id = NodeId::new();
419
420 let payload = make_step_payload(
421 fr,
422 node_id,
423 NodeType::Connector(ConnectorNode {
424 connector: "nonexistent".into(),
425 params: json!({}),
426 idempotency_config: None,
427 }),
428 );
429
430 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
431 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
432 }
433
434 #[test]
435 fn step_connector_terminal_error() {
436 let dir = tempfile::tempdir().unwrap();
437 let file_path = dir.path().join("test.txt");
438 std::fs::write(&file_path, "existing").unwrap();
440
441 let store = make_store();
442 let registry = default_registry();
443 let ctx = make_executor_context();
444 let fr = FlowRunId::new();
445 let node_id = NodeId::new();
446
447 let payload = make_step_payload(
448 fr,
449 node_id,
450 NodeType::Connector(ConnectorNode {
451 connector: "fs.write".into(),
452 params: json!({
453 "path": file_path.to_str().unwrap(),
454 "content": "new",
455 "mode": "create"
456 }),
457 idempotency_config: None,
458 }),
459 );
460
461 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
462 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
463 }
464
465 #[test]
466 fn step_connector_invalid_params() {
467 let store = make_store();
468 let registry = default_registry();
469 let ctx = make_executor_context();
470 let fr = FlowRunId::new();
471 let node_id = NodeId::new();
472
473 let payload = make_step_payload(
474 fr,
475 node_id,
476 NodeType::Connector(ConnectorNode {
477 connector: "delay".into(),
478 params: json!({}), idempotency_config: None,
480 }),
481 );
482
483 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
484 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
485 }
486
487 #[test]
490 fn step_executes_identity_transform() {
491 let store = make_store();
492 let registry = default_registry();
493 let ctx = make_executor_context();
494 let fr = FlowRunId::new();
495 let node_id = NodeId::new();
496
497 let input_val = json!({"key": "value", "num": 42});
498 let payload = make_step_payload(
499 fr,
500 node_id,
501 NodeType::Transform(TransformNode {
502 transform: TransformType::Identity,
503 input: input_val.clone(),
504 }),
505 );
506
507 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
508 assert!(matches!(result, HandlerOutput::Success { .. }));
509 let stored = store.get(fr, node_id).unwrap().unwrap();
510 assert_eq!(stored, input_val);
511 }
512
513 #[test]
514 fn step_executes_field_mapping() {
515 use worldinterface_core::flowspec::transform::{FieldMapping, FieldMappingSpec};
516
517 let store = make_store();
518 let registry = default_registry();
519 let ctx = make_executor_context();
520 let fr = FlowRunId::new();
521 let node_id = NodeId::new();
522
523 let payload = make_step_payload(
524 fr,
525 node_id,
526 NodeType::Transform(TransformNode {
527 transform: TransformType::FieldMapping(FieldMappingSpec {
528 mappings: vec![FieldMapping { from: "a".into(), to: "b".into() }],
529 }),
530 input: json!({"a": 1}),
531 }),
532 );
533
534 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
535 assert!(matches!(result, HandlerOutput::Success { .. }));
536 let stored = store.get(fr, node_id).unwrap().unwrap();
537 assert_eq!(stored, json!({"b": 1}));
538 }
539
540 #[test]
541 fn step_transform_error_is_terminal() {
542 use worldinterface_core::flowspec::transform::{FieldMapping, FieldMappingSpec};
543
544 let store = make_store();
545 let registry = default_registry();
546 let ctx = make_executor_context();
547 let fr = FlowRunId::new();
548 let node_id = NodeId::new();
549
550 let payload = make_step_payload(
551 fr,
552 node_id,
553 NodeType::Transform(TransformNode {
554 transform: TransformType::FieldMapping(FieldMappingSpec {
555 mappings: vec![FieldMapping { from: "nonexistent".into(), to: "out".into() }],
556 }),
557 input: json!({"a": 1}),
558 }),
559 );
560
561 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
562 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
563 }
564
565 #[test]
568 fn step_evaluates_exists_true() {
569 let store = make_store();
570 let registry = default_registry();
571 let ctx = make_executor_context();
572 let fr = FlowRunId::new();
573 let node_id = NodeId::new();
574 let then_target = NodeId::new();
575
576 let mut payload = make_step_payload(
577 fr,
578 node_id,
579 NodeType::Branch(BranchNode {
580 condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
581 then_edge: then_target,
582 else_edge: None,
583 }),
584 );
585 payload.flow_params = Some(json!({"flag": "present"}));
586
587 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
588 assert!(matches!(result, HandlerOutput::Success { .. }));
589
590 let stored = store.get(fr, node_id).unwrap().unwrap();
591 let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
592 assert!(branch_result.taken);
593 assert_eq!(branch_result.then_target, then_target);
594 }
595
596 #[test]
597 fn step_evaluates_exists_false() {
598 let store = make_store();
599 let registry = default_registry();
600 let ctx = make_executor_context();
601 let fr = FlowRunId::new();
602 let node_id = NodeId::new();
603 let then_target = NodeId::new();
604 let else_target = NodeId::new();
605
606 let mut payload = make_step_payload(
607 fr,
608 node_id,
609 NodeType::Branch(BranchNode {
610 condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
611 then_edge: then_target,
612 else_edge: Some(else_target),
613 }),
614 );
615 payload.flow_params = Some(json!({"flag": null}));
616
617 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
618 assert!(matches!(result, HandlerOutput::Success { .. }));
619
620 let stored = store.get(fr, node_id).unwrap().unwrap();
621 let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
622 assert!(!branch_result.taken);
623 }
624
625 #[test]
626 fn step_evaluates_equals_true() {
627 let store = make_store();
628 let registry = default_registry();
629 let ctx = make_executor_context();
630 let fr = FlowRunId::new();
631 let node_id = NodeId::new();
632 let then_target = NodeId::new();
633
634 let mut payload = make_step_payload(
635 fr,
636 node_id,
637 NodeType::Branch(BranchNode {
638 condition: BranchCondition::Equals {
639 left: ParamRef::FlowParam { path: "status".into() },
640 right: json!("ok"),
641 },
642 then_edge: then_target,
643 else_edge: None,
644 }),
645 );
646 payload.flow_params = Some(json!({"status": "ok"}));
647
648 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
649 assert!(matches!(result, HandlerOutput::Success { .. }));
650
651 let stored = store.get(fr, node_id).unwrap().unwrap();
652 let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
653 assert!(branch_result.taken);
654 }
655
656 #[test]
657 fn step_expression_not_implemented() {
658 let store = make_store();
659 let registry = default_registry();
660 let ctx = make_executor_context();
661 let fr = FlowRunId::new();
662 let node_id = NodeId::new();
663 let then_target = NodeId::new();
664
665 let payload = make_step_payload(
666 fr,
667 node_id,
668 NodeType::Branch(BranchNode {
669 condition: BranchCondition::Expression("1 + 1".into()),
670 then_edge: then_target,
671 else_edge: None,
672 }),
673 );
674
675 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
676 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
677 }
678
679 #[test]
682 fn step_skips_invocation_if_output_exists() {
683 let store = make_store();
684 let registry = default_registry();
685 let ctx = make_executor_context();
686 let fr = FlowRunId::new();
687 let node_id = NodeId::new();
688
689 store.put(fr, node_id, &json!({"pre": "existing"})).unwrap();
691
692 let payload = make_step_payload(
693 fr,
694 node_id,
695 NodeType::Connector(ConnectorNode {
696 connector: "delay".into(),
697 params: json!({"duration_ms": 10}),
698 idempotency_config: None,
699 }),
700 );
701
702 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
703 assert!(matches!(result, HandlerOutput::Success { .. }));
704 let stored = store.get(fr, node_id).unwrap().unwrap();
706 assert_eq!(stored, json!({"pre": "existing"}));
707 }
708
709 #[test]
712 fn context_has_correct_ids() {
713 let ctx = make_executor_context();
714 let fr = FlowRunId::new();
715 let node_id = NodeId::new();
716 let payload = make_step_payload(
717 fr,
718 node_id,
719 NodeType::Connector(ConnectorNode {
720 connector: "delay".into(),
721 params: json!({"duration_ms": 0}),
722 idempotency_config: None,
723 }),
724 );
725
726 let inv_ctx = build_invocation_context(&ctx, &payload);
727 assert_eq!(inv_ctx.flow_run_id, fr);
728 assert_eq!(inv_ctx.node_id, node_id);
729 assert_eq!(inv_ctx.run_id, *ctx.input.run_id.as_uuid());
730 assert_eq!(inv_ctx.attempt_id, *ctx.input.attempt_id.as_uuid());
731 assert_eq!(inv_ctx.attempt_number, 1);
732 }
733
734 #[test]
735 fn cancellation_token_bridges_correctly() {
736 let ctx = {
737 let c = make_executor_context();
738 c.input.cancellation_context.cancel();
739 c
740 };
741
742 let payload = make_step_payload(
743 FlowRunId::new(),
744 NodeId::new(),
745 NodeType::Connector(ConnectorNode {
746 connector: "delay".into(),
747 params: json!({}),
748 idempotency_config: None,
749 }),
750 );
751
752 let inv_ctx = build_invocation_context(&ctx, &payload);
753 assert!(inv_ctx.cancellation.is_cancelled());
754 }
755
756 #[test]
759 fn connector_step_produces_receipt() {
760 let registry = default_registry();
763 let connector = registry.get("delay").unwrap();
764 let inv_ctx = InvocationContext {
765 flow_run_id: FlowRunId::new(),
766 node_id: NodeId::new(),
767 step_run_id: worldinterface_flowspec::id::derive_step_run_id(FlowRunId::new(), NodeId::new()),
768 run_id: uuid::Uuid::new_v4(),
769 attempt_id: uuid::Uuid::new_v4(),
770 attempt_number: 1,
771 cancellation: CancellationToken::new(),
772 };
773 let params = json!({"duration_ms": 10});
774 let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, ¶ms);
775 assert!(result.is_ok());
776 assert_eq!(receipt.status, worldinterface_core::receipt::ReceiptStatus::Success);
777 assert_eq!(receipt.connector, "delay");
778 }
779
780 #[test]
781 fn connector_step_produces_receipt_on_failure() {
782 let registry = default_registry();
783 let connector = registry.get("delay").unwrap();
784 let inv_ctx = InvocationContext {
785 flow_run_id: FlowRunId::new(),
786 node_id: NodeId::new(),
787 step_run_id: worldinterface_flowspec::id::derive_step_run_id(FlowRunId::new(), NodeId::new()),
788 run_id: uuid::Uuid::new_v4(),
789 attempt_id: uuid::Uuid::new_v4(),
790 attempt_number: 1,
791 cancellation: CancellationToken::new(),
792 };
793 let params = json!({}); let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, ¶ms);
795 assert!(result.is_err());
796 assert_eq!(receipt.status, worldinterface_core::receipt::ReceiptStatus::Failure);
797 assert_eq!(receipt.connector, "delay");
798 assert!(receipt.error.is_some());
799 }
800
801 #[test]
802 fn transform_step_does_not_produce_receipt() {
803 let store = make_store();
807 let registry = default_registry();
808 let ctx = make_executor_context();
809 let fr = FlowRunId::new();
810 let node_id = NodeId::new();
811
812 let payload = make_step_payload(
813 fr,
814 node_id,
815 NodeType::Transform(TransformNode {
816 transform: TransformType::Identity,
817 input: json!({"key": "value"}),
818 }),
819 );
820
821 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
822 assert!(matches!(result, HandlerOutput::Success { .. }));
823 assert!(store.get(fr, node_id).unwrap().is_some());
827 }
828
829 #[test]
832 fn connector_step_stores_receipt_on_success() {
833 let store = make_store();
834 let registry = default_registry();
835 let ctx = make_executor_context();
836 let fr = FlowRunId::new();
837 let node_id = NodeId::new();
838
839 let payload = make_step_payload(
840 fr,
841 node_id,
842 NodeType::Connector(ConnectorNode {
843 connector: "delay".into(),
844 params: json!({"duration_ms": 10}),
845 idempotency_config: None,
846 }),
847 );
848
849 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
850 assert!(matches!(result, HandlerOutput::Success { .. }));
851
852 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
854 let receipt = store.get_global(&receipt_key).unwrap();
855 assert!(receipt.is_some(), "receipt should be stored on success");
856 let receipt = receipt.unwrap();
857 assert_eq!(receipt.get("connector").and_then(|v| v.as_str()), Some("delay"));
858 assert_eq!(receipt.get("status").and_then(|v| v.as_str()), Some("success"));
859 }
860
861 #[test]
862 fn connector_step_stores_receipt_on_failure() {
863 let store = make_store();
864 let registry = default_registry();
865 let ctx = make_executor_context();
866 let fr = FlowRunId::new();
867 let node_id = NodeId::new();
868
869 let payload = make_step_payload(
870 fr,
871 node_id,
872 NodeType::Connector(ConnectorNode {
873 connector: "delay".into(),
874 params: json!({}), idempotency_config: None,
876 }),
877 );
878
879 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
880 assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
881
882 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
884 let receipt = store.get_global(&receipt_key).unwrap();
885 assert!(receipt.is_some(), "receipt should be stored even on failure");
886 let receipt = receipt.unwrap();
887 assert_eq!(receipt.get("status").and_then(|v| v.as_str()), Some("failure"));
888 assert!(receipt.get("error").is_some(), "failure receipt should have error field");
889 }
890
891 #[test]
892 fn receipt_has_correct_flow_run_id() {
893 let store = make_store();
894 let registry = default_registry();
895 let ctx = make_executor_context();
896 let fr = FlowRunId::new();
897 let node_id = NodeId::new();
898
899 let payload = make_step_payload(
900 fr,
901 node_id,
902 NodeType::Connector(ConnectorNode {
903 connector: "delay".into(),
904 params: json!({"duration_ms": 10}),
905 idempotency_config: None,
906 }),
907 );
908
909 execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
910
911 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
912 let receipt = store.get_global(&receipt_key).unwrap().unwrap();
913 assert_eq!(
914 receipt.get("flow_run_id").and_then(|v| v.as_str()),
915 Some(fr.to_string()).as_deref()
916 );
917 }
918
919 #[test]
920 fn receipt_has_nonzero_duration() {
921 let store = make_store();
922 let registry = default_registry();
923 let ctx = make_executor_context();
924 let fr = FlowRunId::new();
925 let node_id = NodeId::new();
926
927 let payload = make_step_payload(
928 fr,
929 node_id,
930 NodeType::Connector(ConnectorNode {
931 connector: "delay".into(),
932 params: json!({"duration_ms": 50}),
933 idempotency_config: None,
934 }),
935 );
936
937 execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
938
939 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
940 let receipt = store.get_global(&receipt_key).unwrap().unwrap();
941 let duration = receipt.get("duration_ms").and_then(|v| v.as_u64()).unwrap();
942 assert!(duration >= 50, "duration should be >= 50ms, got {}ms", duration);
943 }
944
945 #[test]
946 fn transform_step_does_not_store_receipt() {
947 let store = make_store();
948 let registry = default_registry();
949 let ctx = make_executor_context();
950 let fr = FlowRunId::new();
951 let node_id = NodeId::new();
952
953 let payload = make_step_payload(
954 fr,
955 node_id,
956 NodeType::Transform(TransformNode {
957 transform: TransformType::Identity,
958 input: json!({"key": "value"}),
959 }),
960 );
961
962 execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
963
964 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
966 let receipt = store.get_global(&receipt_key).unwrap();
967 assert!(receipt.is_none(), "transform steps should not produce receipts");
968 }
969
970 #[test]
971 fn branch_step_does_not_produce_receipt() {
972 let store = make_store();
973 let registry = default_registry();
974 let ctx = make_executor_context();
975 let fr = FlowRunId::new();
976 let node_id = NodeId::new();
977 let then_target = NodeId::new();
978
979 let mut payload = make_step_payload(
980 fr,
981 node_id,
982 NodeType::Branch(BranchNode {
983 condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
984 then_edge: then_target,
985 else_edge: None,
986 }),
987 );
988 payload.flow_params = Some(json!({"flag": true}));
989
990 let result = execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
991 assert!(matches!(result, HandlerOutput::Success { .. }));
992 assert!(store.get(fr, node_id).unwrap().is_some());
996
997 let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
999 assert!(
1000 store.get_global(&receipt_key).unwrap().is_none(),
1001 "branch steps should not produce receipts"
1002 );
1003 }
1004
1005 use std::io;
1008 use std::sync::Mutex;
1009
1010 use tracing_subscriber::fmt;
1011 use tracing_subscriber::prelude::*;
1012
1013 struct BufWriter(Arc<Mutex<Vec<u8>>>);
1015
1016 impl io::Write for BufWriter {
1017 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1018 self.0.lock().unwrap().extend_from_slice(buf);
1019 Ok(buf.len())
1020 }
1021 fn flush(&mut self) -> io::Result<()> {
1022 Ok(())
1023 }
1024 }
1025
1026 impl Clone for BufWriter {
1027 fn clone(&self) -> Self {
1028 BufWriter(Arc::clone(&self.0))
1029 }
1030 }
1031
1032 impl<'a> fmt::MakeWriter<'a> for BufWriter {
1033 type Writer = BufWriter;
1034 fn make_writer(&'a self) -> Self::Writer {
1035 self.clone()
1036 }
1037 }
1038
1039 fn capture_tracing<F: FnOnce()>(f: F) -> String {
1041 let buf = Arc::new(Mutex::new(Vec::new()));
1042 let writer = BufWriter(Arc::clone(&buf));
1043 let subscriber = tracing_subscriber::registry().with(
1044 fmt::layer()
1045 .with_writer(writer)
1046 .with_ansi(false)
1047 .with_target(false)
1048 .with_level(true)
1049 .with_span_events(fmt::format::FmtSpan::NEW | fmt::format::FmtSpan::CLOSE),
1050 );
1051 tracing::subscriber::with_default(subscriber, f);
1052 let bytes = buf.lock().unwrap().clone();
1053 String::from_utf8(bytes).unwrap()
1054 }
1055
1056 #[test]
1057 fn step_run_span_has_flow_run_id_and_node_id() {
1058 let store = make_store();
1059 let registry = default_registry();
1060 let ctx = make_executor_context();
1061 let fr = FlowRunId::new();
1062 let node_id = NodeId::new();
1063
1064 let payload = make_step_payload(
1065 fr,
1066 node_id,
1067 NodeType::Connector(ConnectorNode {
1068 connector: "delay".into(),
1069 params: json!({"duration_ms": 10}),
1070 idempotency_config: None,
1071 }),
1072 );
1073
1074 let output = capture_tracing(|| {
1075 execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
1076 });
1077
1078 assert!(
1079 output.contains(&fr.to_string()),
1080 "tracing output should contain flow_run_id {}, got:\n{}",
1081 fr,
1082 output
1083 );
1084 assert!(
1085 output.contains(&node_id.to_string()),
1086 "tracing output should contain node_id {}, got:\n{}",
1087 node_id,
1088 output
1089 );
1090 assert!(
1091 output.contains("step_run"),
1092 "tracing output should contain step_run span name, got:\n{}",
1093 output
1094 );
1095 }
1096
1097 #[test]
1098 fn step_run_span_has_step_type() {
1099 let store = make_store();
1100 let registry = default_registry();
1101 let ctx = make_executor_context();
1102 let fr = FlowRunId::new();
1103 let node_id = NodeId::new();
1104
1105 let payload = make_step_payload(
1106 fr,
1107 node_id,
1108 NodeType::Transform(TransformNode {
1109 transform: TransformType::Identity,
1110 input: json!({"x": 1}),
1111 }),
1112 );
1113
1114 let output = capture_tracing(|| {
1115 execute_step(&ctx, &payload, ®istry, &store, &NOOP_METRICS);
1116 });
1117
1118 assert!(
1119 output.contains("step_type"),
1120 "tracing output should contain step_type field, got:\n{}",
1121 output
1122 );
1123 assert!(
1124 output.contains("transform"),
1125 "tracing output should contain step_type=transform, got:\n{}",
1126 output
1127 );
1128 }
1129}