Skip to main content

worldinterface_coordinator/
step.rs

1//! Step handler — executes a single FlowSpec node.
2//!
3//! Handles three node types:
4//! - **Connector**: invokes a connector via `invoke_with_receipt()`
5//! - **Transform**: executes a pure transform via `execute_transform()`
6//! - **Branch**: evaluates a condition and writes a `BranchResult` to ContextStore
7
8use std::sync::Arc;
9
10use actionqueue_executor_local::handler::{ExecutorContext, HandlerOutput};
11use serde::{Deserialize, Serialize};
12use worldinterface_connector::execute_transform;
13use worldinterface_connector::invoke_with_receipt;
14use worldinterface_connector::{CancellationToken, ConnectorError, ConnectorRegistry, InvocationContext};
15use worldinterface_contextstore::{AtomicWriter, ContextStore, ContextStoreError};
16use worldinterface_core::flowspec::branch::BranchNode;
17use worldinterface_core::flowspec::{ConnectorNode, NodeType, TransformNode};
18use worldinterface_core::id::NodeId;
19use worldinterface_core::metrics::MetricsRecorder;
20use worldinterface_flowspec::id::derive_step_run_id;
21use worldinterface_flowspec::payload::StepPayload;
22
23use crate::error::{ResolveError, StepError};
24use crate::resolve::resolve_params;
25
26/// Result of branch evaluation, stored in ContextStore.
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct BranchResult {
29    /// True if the branch condition evaluated to true.
30    pub taken: bool,
31    /// The NodeId of the then-edge target.
32    pub then_target: NodeId,
33    /// The NodeId of the else-edge target (None if no else branch).
34    pub else_target: Option<NodeId>,
35}
36
37/// Execute a step, returning a HandlerOutput.
38///
39/// This is the core step execution logic, factored out of the handler for testability.
40pub fn execute_step<S: ContextStore>(
41    ctx: &ExecutorContext,
42    payload: &StepPayload,
43    registry: &ConnectorRegistry,
44    store: &Arc<S>,
45    metrics: &dyn MetricsRecorder,
46) -> HandlerOutput {
47    match step_inner(ctx, payload, registry, store, metrics) {
48        Ok(output) => output,
49        Err(e) => map_step_error(e),
50    }
51}
52
53fn step_inner<S: ContextStore>(
54    ctx: &ExecutorContext,
55    payload: &StepPayload,
56    registry: &ConnectorRegistry,
57    store: &Arc<S>,
58    metrics: &dyn MetricsRecorder,
59) -> Result<HandlerOutput, StepError> {
60    let step_type = match &payload.node_type {
61        NodeType::Connector(_) => "connector",
62        NodeType::Transform(_) => "transform",
63        NodeType::Branch(_) => "branch",
64    };
65    let _span = tracing::info_span!(
66        "step_run",
67        flow_run_id = %payload.flow_run_id,
68        node_id = %payload.node_id,
69        step_type = step_type,
70    )
71    .entered();
72
73    match &payload.node_type {
74        NodeType::Connector(connector_node) => {
75            execute_connector_step(ctx, payload, connector_node, registry, store, metrics)
76        }
77        NodeType::Transform(transform_node) => {
78            execute_transform_step(payload, transform_node, store, metrics)
79        }
80        NodeType::Branch(branch_node) => execute_branch_step(payload, branch_node, store, metrics),
81    }
82}
83
84/// Execute a connector step with crash recovery and receipt generation.
85fn execute_connector_step<S: ContextStore>(
86    ctx: &ExecutorContext,
87    payload: &StepPayload,
88    connector_node: &ConnectorNode,
89    registry: &ConnectorRegistry,
90    store: &Arc<S>,
91    metrics: &dyn MetricsRecorder,
92) -> Result<HandlerOutput, StepError> {
93    // Crash recovery: check if output already exists in ContextStore (Invariant 2 + 4)
94    if let Some(_existing) = store.get(payload.flow_run_id, payload.node_id)? {
95        tracing::info!(
96            %payload.flow_run_id, %payload.node_id,
97            "output already exists in ContextStore (idempotent retry), skipping connector invocation"
98        );
99        return Ok(HandlerOutput::success());
100    }
101
102    // Look up connector
103    let connector = registry
104        .get(&connector_node.connector)
105        .ok_or_else(|| StepError::ConnectorNotFound { name: connector_node.connector.clone() })?;
106
107    // Resolve parameters
108    let resolved_params = resolve_params(
109        &connector_node.params,
110        payload.flow_run_id,
111        payload.flow_params.as_ref(),
112        store.as_ref(),
113    )?;
114
115    // Build InvocationContext
116    let inv_ctx = build_invocation_context(ctx, payload);
117
118    // Record connector invocation metric
119    metrics.record_connector_invocation(&connector_node.connector);
120
121    // Invoke with receipt (Invariant 6)
122    let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, &resolved_params);
123
124    match result {
125        Ok(output) => {
126            // Write output to ContextStore via AtomicWriter (Invariant 2)
127            let writer = AtomicWriter::new(Arc::clone(store));
128            writer
129                .write_and_complete(payload.flow_run_id, payload.node_id, &output, || Ok(()))
130                .map_err(|e| match e {
131                    worldinterface_contextstore::AtomicWriteError::WriteFailed(store_err) => {
132                        StepError::ContextStoreError(store_err)
133                    }
134                    worldinterface_contextstore::AtomicWriteError::CompletionFailed(e) => {
135                        StepError::ContextStoreError(ContextStoreError::StorageError(e.to_string()))
136                    }
137                })?;
138
139            // Record ContextStore write metric
140            metrics.record_contextstore_write();
141
142            // Store receipt durably (Invariant 6)
143            let receipt_key = format!("receipt:step:{}:{}", payload.flow_run_id, payload.node_id);
144            if let Ok(receipt_value) = serde_json::to_value(&receipt) {
145                let _ = store.upsert_global(&receipt_key, &receipt_value);
146            }
147
148            // Record success metrics
149            metrics.record_step_completed(
150                &connector_node.connector,
151                receipt.duration_ms as f64 / 1000.0,
152            );
153
154            Ok(HandlerOutput::success())
155        }
156        Err(e) => {
157            // Store failure receipt durably (Invariant 6 — receipts even on failure)
158            // Best-effort: failure receipt storage should not mask the original error
159            let receipt_key = format!("receipt:step:{}:{}", payload.flow_run_id, payload.node_id);
160            if let Ok(receipt_value) = serde_json::to_value(&receipt) {
161                let _ = store.upsert_global(&receipt_key, &receipt_value);
162            }
163
164            // Record failure metrics
165            metrics.record_step_failed(&connector_node.connector);
166
167            Err(StepError::ConnectorError(e))
168        }
169    }
170}
171
172/// Execute a transform step.
173fn execute_transform_step<S: ContextStore>(
174    payload: &StepPayload,
175    transform_node: &TransformNode,
176    store: &Arc<S>,
177    metrics: &dyn MetricsRecorder,
178) -> Result<HandlerOutput, StepError> {
179    // Crash recovery: check if output already exists
180    if store.get(payload.flow_run_id, payload.node_id)?.is_some() {
181        tracing::info!(
182            %payload.flow_run_id, %payload.node_id,
183            "transform output already exists (idempotent retry), skipping"
184        );
185        return Ok(HandlerOutput::success());
186    }
187
188    // Resolve transform input (may contain template references)
189    let resolved_input = resolve_params(
190        &transform_node.input,
191        payload.flow_run_id,
192        payload.flow_params.as_ref(),
193        store.as_ref(),
194    )?;
195
196    // Execute the transform
197    let output = execute_transform(&transform_node.transform, &resolved_input)?;
198
199    // Write to ContextStore
200    store.put(payload.flow_run_id, payload.node_id, &output).or_else(|e| match e {
201        ContextStoreError::AlreadyExists { .. } => Ok(()),
202        other => Err(other),
203    })?;
204
205    // Record ContextStore write metric
206    metrics.record_contextstore_write();
207
208    Ok(HandlerOutput::success())
209}
210
211/// Execute a branch step.
212fn execute_branch_step<S: ContextStore>(
213    payload: &StepPayload,
214    branch_node: &BranchNode,
215    store: &Arc<S>,
216    metrics: &dyn MetricsRecorder,
217) -> Result<HandlerOutput, StepError> {
218    // Crash recovery: check if branch result already exists
219    if store.get(payload.flow_run_id, payload.node_id)?.is_some() {
220        tracing::info!(
221            %payload.flow_run_id, %payload.node_id,
222            "branch result already exists (idempotent retry), skipping"
223        );
224        return Ok(HandlerOutput::success());
225    }
226
227    // Evaluate the branch condition
228    let taken = crate::branch_eval::evaluate_branch(
229        &branch_node.condition,
230        payload.flow_run_id,
231        payload.flow_params.as_ref(),
232        store.as_ref(),
233    )?;
234
235    let branch_result = BranchResult {
236        taken,
237        then_target: branch_node.then_edge,
238        else_target: branch_node.else_edge,
239    };
240
241    // Write BranchResult to ContextStore
242    let value = serde_json::to_value(&branch_result).map_err(|e| {
243        StepError::ContextStoreError(ContextStoreError::StorageError(e.to_string()))
244    })?;
245
246    store.put(payload.flow_run_id, payload.node_id, &value).or_else(|e| match e {
247        ContextStoreError::AlreadyExists { .. } => Ok(()),
248        other => Err(other),
249    })?;
250
251    // Record ContextStore write metric
252    metrics.record_contextstore_write();
253
254    Ok(HandlerOutput::success())
255}
256
257/// Build an InvocationContext from AQ's ExecutorContext and the StepPayload.
258fn build_invocation_context(ctx: &ExecutorContext, payload: &StepPayload) -> InvocationContext {
259    let step_run_id = derive_step_run_id(payload.flow_run_id, payload.node_id);
260
261    // Bridge cancellation: share AQ's underlying AtomicBool with UI's token.
262    // AQ's watchdog thread or dispatch loop can cancel at any time; this
263    // gives UI connectors live visibility into that signal with zero overhead.
264    let cancellation =
265        CancellationToken::from_flag(ctx.input.cancellation_context.token().cancelled_flag());
266
267    InvocationContext {
268        flow_run_id: payload.flow_run_id,
269        node_id: payload.node_id,
270        step_run_id,
271        run_id: *ctx.input.run_id.as_uuid(),
272        attempt_id: *ctx.input.attempt_id.as_uuid(),
273        attempt_number: ctx.input.metadata.attempt_number,
274        cancellation,
275    }
276}
277
278/// Map StepError to HandlerOutput.
279fn map_step_error(err: StepError) -> HandlerOutput {
280    match &err {
281        StepError::ConnectorError(ConnectorError::Retryable(_)) => {
282            HandlerOutput::retryable_failure(err.to_string())
283        }
284        StepError::ConnectorError(ConnectorError::Cancelled) => {
285            HandlerOutput::retryable_failure(err.to_string())
286        }
287        StepError::ConnectorError(ConnectorError::Terminal(_))
288        | StepError::ConnectorError(ConnectorError::InvalidParams(_)) => {
289            HandlerOutput::terminal_failure(err.to_string())
290        }
291        StepError::ResolveFailed(ResolveError::NodeOutputNotFound { .. }) => {
292            HandlerOutput::retryable_failure(err.to_string())
293        }
294        StepError::ResolveFailed(ResolveError::ContextStoreError(
295            ContextStoreError::StorageError(_),
296        )) => HandlerOutput::retryable_failure(err.to_string()),
297        StepError::ContextStoreError(ContextStoreError::StorageError(_)) => {
298            HandlerOutput::retryable_failure(err.to_string())
299        }
300        _ => HandlerOutput::terminal_failure(err.to_string()),
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use actionqueue_core::ids::{AttemptId, RunId};
307    use actionqueue_core::task::safety::SafetyLevel;
308    use actionqueue_executor_local::handler::{AttemptMetadata, CancellationContext, HandlerInput};
309    use serde_json::json;
310    use worldinterface_connector::connectors::default_registry;
311    use worldinterface_contextstore::SqliteContextStore;
312    use worldinterface_core::flowspec::branch::{BranchCondition, ParamRef};
313    use worldinterface_core::flowspec::transform::TransformType;
314    use worldinterface_core::id::FlowRunId;
315    use worldinterface_core::metrics::NoopMetricsRecorder;
316
317    use super::*;
318
319    static NOOP_METRICS: NoopMetricsRecorder = NoopMetricsRecorder;
320
321    fn make_store() -> Arc<SqliteContextStore> {
322        Arc::new(SqliteContextStore::in_memory().unwrap())
323    }
324
325    fn make_executor_context() -> ExecutorContext {
326        ExecutorContext {
327            input: HandlerInput {
328                run_id: RunId::new(),
329                attempt_id: AttemptId::new(),
330                payload: vec![],
331                metadata: AttemptMetadata {
332                    max_attempts: 3,
333                    attempt_number: 1,
334                    timeout_secs: None,
335                    safety_level: SafetyLevel::Idempotent,
336                },
337                cancellation_context: CancellationContext::new(),
338            },
339            submission: None,
340            children: None,
341        }
342    }
343
344    fn make_step_payload(
345        flow_run_id: FlowRunId,
346        node_id: NodeId,
347        node_type: NodeType,
348    ) -> StepPayload {
349        StepPayload {
350            task_type: worldinterface_flowspec::payload::TaskType::Step,
351            flow_run_id,
352            node_id,
353            node_type,
354            flow_params: None,
355        }
356    }
357
358    // T-2: Step Handler — Connector Invocation
359
360    #[test]
361    fn step_invokes_delay_connector() {
362        let store = make_store();
363        let registry = default_registry();
364        let ctx = make_executor_context();
365        let fr = FlowRunId::new();
366        let node_id = NodeId::new();
367
368        let payload = make_step_payload(
369            fr,
370            node_id,
371            NodeType::Connector(ConnectorNode {
372                connector: "delay".into(),
373                params: json!({"duration_ms": 10}),
374                idempotency_config: None,
375            }),
376        );
377
378        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
379        assert!(matches!(result, HandlerOutput::Success { .. }));
380        assert!(store.get(fr, node_id).unwrap().is_some());
381    }
382
383    #[test]
384    fn step_invokes_fs_write_connector() {
385        let dir = tempfile::tempdir().unwrap();
386        let file_path = dir.path().join("test.txt");
387        let store = make_store();
388        let registry = default_registry();
389        let ctx = make_executor_context();
390        let fr = FlowRunId::new();
391        let node_id = NodeId::new();
392
393        let payload = make_step_payload(
394            fr,
395            node_id,
396            NodeType::Connector(ConnectorNode {
397                connector: "fs.write".into(),
398                params: json!({
399                    "path": file_path.to_str().unwrap(),
400                    "content": "hello world"
401                }),
402                idempotency_config: None,
403            }),
404        );
405
406        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
407        assert!(matches!(result, HandlerOutput::Success { .. }));
408        assert_eq!(std::fs::read_to_string(&file_path).unwrap(), "hello world");
409        assert!(store.get(fr, node_id).unwrap().is_some());
410    }
411
412    #[test]
413    fn step_connector_not_found() {
414        let store = make_store();
415        let registry = default_registry();
416        let ctx = make_executor_context();
417        let fr = FlowRunId::new();
418        let node_id = NodeId::new();
419
420        let payload = make_step_payload(
421            fr,
422            node_id,
423            NodeType::Connector(ConnectorNode {
424                connector: "nonexistent".into(),
425                params: json!({}),
426                idempotency_config: None,
427            }),
428        );
429
430        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
431        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
432    }
433
434    #[test]
435    fn step_connector_terminal_error() {
436        let dir = tempfile::tempdir().unwrap();
437        let file_path = dir.path().join("test.txt");
438        // Create file so "create" mode fails
439        std::fs::write(&file_path, "existing").unwrap();
440
441        let store = make_store();
442        let registry = default_registry();
443        let ctx = make_executor_context();
444        let fr = FlowRunId::new();
445        let node_id = NodeId::new();
446
447        let payload = make_step_payload(
448            fr,
449            node_id,
450            NodeType::Connector(ConnectorNode {
451                connector: "fs.write".into(),
452                params: json!({
453                    "path": file_path.to_str().unwrap(),
454                    "content": "new",
455                    "mode": "create"
456                }),
457                idempotency_config: None,
458            }),
459        );
460
461        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
462        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
463    }
464
465    #[test]
466    fn step_connector_invalid_params() {
467        let store = make_store();
468        let registry = default_registry();
469        let ctx = make_executor_context();
470        let fr = FlowRunId::new();
471        let node_id = NodeId::new();
472
473        let payload = make_step_payload(
474            fr,
475            node_id,
476            NodeType::Connector(ConnectorNode {
477                connector: "delay".into(),
478                params: json!({}), // missing duration_ms
479                idempotency_config: None,
480            }),
481        );
482
483        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
484        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
485    }
486
487    // T-3: Step Handler — Transform Execution
488
489    #[test]
490    fn step_executes_identity_transform() {
491        let store = make_store();
492        let registry = default_registry();
493        let ctx = make_executor_context();
494        let fr = FlowRunId::new();
495        let node_id = NodeId::new();
496
497        let input_val = json!({"key": "value", "num": 42});
498        let payload = make_step_payload(
499            fr,
500            node_id,
501            NodeType::Transform(TransformNode {
502                transform: TransformType::Identity,
503                input: input_val.clone(),
504            }),
505        );
506
507        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
508        assert!(matches!(result, HandlerOutput::Success { .. }));
509        let stored = store.get(fr, node_id).unwrap().unwrap();
510        assert_eq!(stored, input_val);
511    }
512
513    #[test]
514    fn step_executes_field_mapping() {
515        use worldinterface_core::flowspec::transform::{FieldMapping, FieldMappingSpec};
516
517        let store = make_store();
518        let registry = default_registry();
519        let ctx = make_executor_context();
520        let fr = FlowRunId::new();
521        let node_id = NodeId::new();
522
523        let payload = make_step_payload(
524            fr,
525            node_id,
526            NodeType::Transform(TransformNode {
527                transform: TransformType::FieldMapping(FieldMappingSpec {
528                    mappings: vec![FieldMapping { from: "a".into(), to: "b".into() }],
529                }),
530                input: json!({"a": 1}),
531            }),
532        );
533
534        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
535        assert!(matches!(result, HandlerOutput::Success { .. }));
536        let stored = store.get(fr, node_id).unwrap().unwrap();
537        assert_eq!(stored, json!({"b": 1}));
538    }
539
540    #[test]
541    fn step_transform_error_is_terminal() {
542        use worldinterface_core::flowspec::transform::{FieldMapping, FieldMappingSpec};
543
544        let store = make_store();
545        let registry = default_registry();
546        let ctx = make_executor_context();
547        let fr = FlowRunId::new();
548        let node_id = NodeId::new();
549
550        let payload = make_step_payload(
551            fr,
552            node_id,
553            NodeType::Transform(TransformNode {
554                transform: TransformType::FieldMapping(FieldMappingSpec {
555                    mappings: vec![FieldMapping { from: "nonexistent".into(), to: "out".into() }],
556                }),
557                input: json!({"a": 1}),
558            }),
559        );
560
561        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
562        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
563    }
564
565    // T-4: Step Handler — Branch Evaluation
566
567    #[test]
568    fn step_evaluates_exists_true() {
569        let store = make_store();
570        let registry = default_registry();
571        let ctx = make_executor_context();
572        let fr = FlowRunId::new();
573        let node_id = NodeId::new();
574        let then_target = NodeId::new();
575
576        let mut payload = make_step_payload(
577            fr,
578            node_id,
579            NodeType::Branch(BranchNode {
580                condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
581                then_edge: then_target,
582                else_edge: None,
583            }),
584        );
585        payload.flow_params = Some(json!({"flag": "present"}));
586
587        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
588        assert!(matches!(result, HandlerOutput::Success { .. }));
589
590        let stored = store.get(fr, node_id).unwrap().unwrap();
591        let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
592        assert!(branch_result.taken);
593        assert_eq!(branch_result.then_target, then_target);
594    }
595
596    #[test]
597    fn step_evaluates_exists_false() {
598        let store = make_store();
599        let registry = default_registry();
600        let ctx = make_executor_context();
601        let fr = FlowRunId::new();
602        let node_id = NodeId::new();
603        let then_target = NodeId::new();
604        let else_target = NodeId::new();
605
606        let mut payload = make_step_payload(
607            fr,
608            node_id,
609            NodeType::Branch(BranchNode {
610                condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
611                then_edge: then_target,
612                else_edge: Some(else_target),
613            }),
614        );
615        payload.flow_params = Some(json!({"flag": null}));
616
617        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
618        assert!(matches!(result, HandlerOutput::Success { .. }));
619
620        let stored = store.get(fr, node_id).unwrap().unwrap();
621        let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
622        assert!(!branch_result.taken);
623    }
624
625    #[test]
626    fn step_evaluates_equals_true() {
627        let store = make_store();
628        let registry = default_registry();
629        let ctx = make_executor_context();
630        let fr = FlowRunId::new();
631        let node_id = NodeId::new();
632        let then_target = NodeId::new();
633
634        let mut payload = make_step_payload(
635            fr,
636            node_id,
637            NodeType::Branch(BranchNode {
638                condition: BranchCondition::Equals {
639                    left: ParamRef::FlowParam { path: "status".into() },
640                    right: json!("ok"),
641                },
642                then_edge: then_target,
643                else_edge: None,
644            }),
645        );
646        payload.flow_params = Some(json!({"status": "ok"}));
647
648        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
649        assert!(matches!(result, HandlerOutput::Success { .. }));
650
651        let stored = store.get(fr, node_id).unwrap().unwrap();
652        let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
653        assert!(branch_result.taken);
654    }
655
656    #[test]
657    fn step_expression_not_implemented() {
658        let store = make_store();
659        let registry = default_registry();
660        let ctx = make_executor_context();
661        let fr = FlowRunId::new();
662        let node_id = NodeId::new();
663        let then_target = NodeId::new();
664
665        let payload = make_step_payload(
666            fr,
667            node_id,
668            NodeType::Branch(BranchNode {
669                condition: BranchCondition::Expression("1 + 1".into()),
670                then_edge: then_target,
671                else_edge: None,
672            }),
673        );
674
675        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
676        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
677    }
678
679    // T-5: Step Handler — Crash Recovery
680
681    #[test]
682    fn step_skips_invocation_if_output_exists() {
683        let store = make_store();
684        let registry = default_registry();
685        let ctx = make_executor_context();
686        let fr = FlowRunId::new();
687        let node_id = NodeId::new();
688
689        // Pre-write output
690        store.put(fr, node_id, &json!({"pre": "existing"})).unwrap();
691
692        let payload = make_step_payload(
693            fr,
694            node_id,
695            NodeType::Connector(ConnectorNode {
696                connector: "delay".into(),
697                params: json!({"duration_ms": 10}),
698                idempotency_config: None,
699            }),
700        );
701
702        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
703        assert!(matches!(result, HandlerOutput::Success { .. }));
704        // Output should still be the pre-existing one
705        let stored = store.get(fr, node_id).unwrap().unwrap();
706        assert_eq!(stored, json!({"pre": "existing"}));
707    }
708
709    // T-11: InvocationContext Construction
710
711    #[test]
712    fn context_has_correct_ids() {
713        let ctx = make_executor_context();
714        let fr = FlowRunId::new();
715        let node_id = NodeId::new();
716        let payload = make_step_payload(
717            fr,
718            node_id,
719            NodeType::Connector(ConnectorNode {
720                connector: "delay".into(),
721                params: json!({"duration_ms": 0}),
722                idempotency_config: None,
723            }),
724        );
725
726        let inv_ctx = build_invocation_context(&ctx, &payload);
727        assert_eq!(inv_ctx.flow_run_id, fr);
728        assert_eq!(inv_ctx.node_id, node_id);
729        assert_eq!(inv_ctx.run_id, *ctx.input.run_id.as_uuid());
730        assert_eq!(inv_ctx.attempt_id, *ctx.input.attempt_id.as_uuid());
731        assert_eq!(inv_ctx.attempt_number, 1);
732    }
733
734    #[test]
735    fn cancellation_token_bridges_correctly() {
736        let ctx = {
737            let c = make_executor_context();
738            c.input.cancellation_context.cancel();
739            c
740        };
741
742        let payload = make_step_payload(
743            FlowRunId::new(),
744            NodeId::new(),
745            NodeType::Connector(ConnectorNode {
746                connector: "delay".into(),
747                params: json!({}),
748                idempotency_config: None,
749            }),
750        );
751
752        let inv_ctx = build_invocation_context(&ctx, &payload);
753        assert!(inv_ctx.cancellation.is_cancelled());
754    }
755
756    // T-10: Receipt Generation (Invariant 6)
757
758    #[test]
759    fn connector_step_produces_receipt() {
760        // invoke_with_receipt always produces a receipt, even on success.
761        // Verify by calling it directly with a real connector.
762        let registry = default_registry();
763        let connector = registry.get("delay").unwrap();
764        let inv_ctx = InvocationContext {
765            flow_run_id: FlowRunId::new(),
766            node_id: NodeId::new(),
767            step_run_id: worldinterface_flowspec::id::derive_step_run_id(FlowRunId::new(), NodeId::new()),
768            run_id: uuid::Uuid::new_v4(),
769            attempt_id: uuid::Uuid::new_v4(),
770            attempt_number: 1,
771            cancellation: CancellationToken::new(),
772        };
773        let params = json!({"duration_ms": 10});
774        let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, &params);
775        assert!(result.is_ok());
776        assert_eq!(receipt.status, worldinterface_core::receipt::ReceiptStatus::Success);
777        assert_eq!(receipt.connector, "delay");
778    }
779
780    #[test]
781    fn connector_step_produces_receipt_on_failure() {
782        let registry = default_registry();
783        let connector = registry.get("delay").unwrap();
784        let inv_ctx = InvocationContext {
785            flow_run_id: FlowRunId::new(),
786            node_id: NodeId::new(),
787            step_run_id: worldinterface_flowspec::id::derive_step_run_id(FlowRunId::new(), NodeId::new()),
788            run_id: uuid::Uuid::new_v4(),
789            attempt_id: uuid::Uuid::new_v4(),
790            attempt_number: 1,
791            cancellation: CancellationToken::new(),
792        };
793        let params = json!({}); // missing duration_ms → InvalidParams
794        let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, &params);
795        assert!(result.is_err());
796        assert_eq!(receipt.status, worldinterface_core::receipt::ReceiptStatus::Failure);
797        assert_eq!(receipt.connector, "delay");
798        assert!(receipt.error.is_some());
799    }
800
801    #[test]
802    fn transform_step_does_not_produce_receipt() {
803        // Transform steps write to ContextStore but do NOT call invoke_with_receipt.
804        // Verify by executing a transform and confirming the output exists but
805        // no receipt-related code path is taken (no connector invocation).
806        let store = make_store();
807        let registry = default_registry();
808        let ctx = make_executor_context();
809        let fr = FlowRunId::new();
810        let node_id = NodeId::new();
811
812        let payload = make_step_payload(
813            fr,
814            node_id,
815            NodeType::Transform(TransformNode {
816                transform: TransformType::Identity,
817                input: json!({"key": "value"}),
818            }),
819        );
820
821        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
822        assert!(matches!(result, HandlerOutput::Success { .. }));
823        // Output is in ContextStore (transform ran), but no receipt was produced
824        // because the code path goes through execute_transform_step, not
825        // execute_connector_step (which is the only path that calls invoke_with_receipt).
826        assert!(store.get(fr, node_id).unwrap().is_some());
827    }
828
829    // T-4: Durable Receipt Storage (Sprint 8)
830
831    #[test]
832    fn connector_step_stores_receipt_on_success() {
833        let store = make_store();
834        let registry = default_registry();
835        let ctx = make_executor_context();
836        let fr = FlowRunId::new();
837        let node_id = NodeId::new();
838
839        let payload = make_step_payload(
840            fr,
841            node_id,
842            NodeType::Connector(ConnectorNode {
843                connector: "delay".into(),
844                params: json!({"duration_ms": 10}),
845                idempotency_config: None,
846            }),
847        );
848
849        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
850        assert!(matches!(result, HandlerOutput::Success { .. }));
851
852        // Receipt should be stored in ContextStore globals
853        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
854        let receipt = store.get_global(&receipt_key).unwrap();
855        assert!(receipt.is_some(), "receipt should be stored on success");
856        let receipt = receipt.unwrap();
857        assert_eq!(receipt.get("connector").and_then(|v| v.as_str()), Some("delay"));
858        assert_eq!(receipt.get("status").and_then(|v| v.as_str()), Some("success"));
859    }
860
861    #[test]
862    fn connector_step_stores_receipt_on_failure() {
863        let store = make_store();
864        let registry = default_registry();
865        let ctx = make_executor_context();
866        let fr = FlowRunId::new();
867        let node_id = NodeId::new();
868
869        let payload = make_step_payload(
870            fr,
871            node_id,
872            NodeType::Connector(ConnectorNode {
873                connector: "delay".into(),
874                params: json!({}), // missing duration_ms → InvalidParams → failure
875                idempotency_config: None,
876            }),
877        );
878
879        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
880        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
881
882        // Failure receipt should still be stored (best-effort)
883        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
884        let receipt = store.get_global(&receipt_key).unwrap();
885        assert!(receipt.is_some(), "receipt should be stored even on failure");
886        let receipt = receipt.unwrap();
887        assert_eq!(receipt.get("status").and_then(|v| v.as_str()), Some("failure"));
888        assert!(receipt.get("error").is_some(), "failure receipt should have error field");
889    }
890
891    #[test]
892    fn receipt_has_correct_flow_run_id() {
893        let store = make_store();
894        let registry = default_registry();
895        let ctx = make_executor_context();
896        let fr = FlowRunId::new();
897        let node_id = NodeId::new();
898
899        let payload = make_step_payload(
900            fr,
901            node_id,
902            NodeType::Connector(ConnectorNode {
903                connector: "delay".into(),
904                params: json!({"duration_ms": 10}),
905                idempotency_config: None,
906            }),
907        );
908
909        execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
910
911        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
912        let receipt = store.get_global(&receipt_key).unwrap().unwrap();
913        assert_eq!(
914            receipt.get("flow_run_id").and_then(|v| v.as_str()),
915            Some(fr.to_string()).as_deref()
916        );
917    }
918
919    #[test]
920    fn receipt_has_nonzero_duration() {
921        let store = make_store();
922        let registry = default_registry();
923        let ctx = make_executor_context();
924        let fr = FlowRunId::new();
925        let node_id = NodeId::new();
926
927        let payload = make_step_payload(
928            fr,
929            node_id,
930            NodeType::Connector(ConnectorNode {
931                connector: "delay".into(),
932                params: json!({"duration_ms": 50}),
933                idempotency_config: None,
934            }),
935        );
936
937        execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
938
939        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
940        let receipt = store.get_global(&receipt_key).unwrap().unwrap();
941        let duration = receipt.get("duration_ms").and_then(|v| v.as_u64()).unwrap();
942        assert!(duration >= 50, "duration should be >= 50ms, got {}ms", duration);
943    }
944
945    #[test]
946    fn transform_step_does_not_store_receipt() {
947        let store = make_store();
948        let registry = default_registry();
949        let ctx = make_executor_context();
950        let fr = FlowRunId::new();
951        let node_id = NodeId::new();
952
953        let payload = make_step_payload(
954            fr,
955            node_id,
956            NodeType::Transform(TransformNode {
957                transform: TransformType::Identity,
958                input: json!({"key": "value"}),
959            }),
960        );
961
962        execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
963
964        // Transform steps should NOT store receipts (no boundary crossing)
965        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
966        let receipt = store.get_global(&receipt_key).unwrap();
967        assert!(receipt.is_none(), "transform steps should not produce receipts");
968    }
969
970    #[test]
971    fn branch_step_does_not_produce_receipt() {
972        let store = make_store();
973        let registry = default_registry();
974        let ctx = make_executor_context();
975        let fr = FlowRunId::new();
976        let node_id = NodeId::new();
977        let then_target = NodeId::new();
978
979        let mut payload = make_step_payload(
980            fr,
981            node_id,
982            NodeType::Branch(BranchNode {
983                condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
984                then_edge: then_target,
985                else_edge: None,
986            }),
987        );
988        payload.flow_params = Some(json!({"flag": true}));
989
990        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
991        assert!(matches!(result, HandlerOutput::Success { .. }));
992        // BranchResult is in ContextStore, but no receipt was produced
993        // because branch evaluation goes through execute_branch_step, not
994        // execute_connector_step.
995        assert!(store.get(fr, node_id).unwrap().is_some());
996
997        // Verify no receipt key exists (branch steps are not boundary crossings)
998        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
999        assert!(
1000            store.get_global(&receipt_key).unwrap().is_none(),
1001            "branch steps should not produce receipts"
1002        );
1003    }
1004
1005    // T-10: Tracing Span Assertions (Sprint 8)
1006
1007    use std::io;
1008    use std::sync::Mutex;
1009
1010    use tracing_subscriber::fmt;
1011    use tracing_subscriber::prelude::*;
1012
1013    /// Shared buffer writer for capturing tracing output in tests.
1014    struct BufWriter(Arc<Mutex<Vec<u8>>>);
1015
1016    impl io::Write for BufWriter {
1017        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1018            self.0.lock().unwrap().extend_from_slice(buf);
1019            Ok(buf.len())
1020        }
1021        fn flush(&mut self) -> io::Result<()> {
1022            Ok(())
1023        }
1024    }
1025
1026    impl Clone for BufWriter {
1027        fn clone(&self) -> Self {
1028            BufWriter(Arc::clone(&self.0))
1029        }
1030    }
1031
1032    impl<'a> fmt::MakeWriter<'a> for BufWriter {
1033        type Writer = BufWriter;
1034        fn make_writer(&'a self) -> Self::Writer {
1035            self.clone()
1036        }
1037    }
1038
1039    /// Run a closure with a tracing subscriber that captures output, return the captured text.
1040    fn capture_tracing<F: FnOnce()>(f: F) -> String {
1041        let buf = Arc::new(Mutex::new(Vec::new()));
1042        let writer = BufWriter(Arc::clone(&buf));
1043        let subscriber = tracing_subscriber::registry().with(
1044            fmt::layer()
1045                .with_writer(writer)
1046                .with_ansi(false)
1047                .with_target(false)
1048                .with_level(true)
1049                .with_span_events(fmt::format::FmtSpan::NEW | fmt::format::FmtSpan::CLOSE),
1050        );
1051        tracing::subscriber::with_default(subscriber, f);
1052        let bytes = buf.lock().unwrap().clone();
1053        String::from_utf8(bytes).unwrap()
1054    }
1055
1056    #[test]
1057    fn step_run_span_has_flow_run_id_and_node_id() {
1058        let store = make_store();
1059        let registry = default_registry();
1060        let ctx = make_executor_context();
1061        let fr = FlowRunId::new();
1062        let node_id = NodeId::new();
1063
1064        let payload = make_step_payload(
1065            fr,
1066            node_id,
1067            NodeType::Connector(ConnectorNode {
1068                connector: "delay".into(),
1069                params: json!({"duration_ms": 10}),
1070                idempotency_config: None,
1071            }),
1072        );
1073
1074        let output = capture_tracing(|| {
1075            execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
1076        });
1077
1078        assert!(
1079            output.contains(&fr.to_string()),
1080            "tracing output should contain flow_run_id {}, got:\n{}",
1081            fr,
1082            output
1083        );
1084        assert!(
1085            output.contains(&node_id.to_string()),
1086            "tracing output should contain node_id {}, got:\n{}",
1087            node_id,
1088            output
1089        );
1090        assert!(
1091            output.contains("step_run"),
1092            "tracing output should contain step_run span name, got:\n{}",
1093            output
1094        );
1095    }
1096
1097    #[test]
1098    fn step_run_span_has_step_type() {
1099        let store = make_store();
1100        let registry = default_registry();
1101        let ctx = make_executor_context();
1102        let fr = FlowRunId::new();
1103        let node_id = NodeId::new();
1104
1105        let payload = make_step_payload(
1106            fr,
1107            node_id,
1108            NodeType::Transform(TransformNode {
1109                transform: TransformType::Identity,
1110                input: json!({"x": 1}),
1111            }),
1112        );
1113
1114        let output = capture_tracing(|| {
1115            execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
1116        });
1117
1118        assert!(
1119            output.contains("step_type"),
1120            "tracing output should contain step_type field, got:\n{}",
1121            output
1122        );
1123        assert!(
1124            output.contains("transform"),
1125            "tracing output should contain step_type=transform, got:\n{}",
1126            output
1127        );
1128    }
1129}