Skip to main content

worldinterface_coordinator/
step.rs

1//! Step handler — executes a single FlowSpec node.
2//!
3//! Handles three node types:
4//! - **Connector**: invokes a connector via `invoke_with_receipt()`
5//! - **Transform**: executes a pure transform via `execute_transform()`
6//! - **Branch**: evaluates a condition and writes a `BranchResult` to ContextStore
7
8use std::sync::Arc;
9
10use actionqueue_executor_local::handler::{ExecutorContext, HandlerOutput};
11use serde::{Deserialize, Serialize};
12use worldinterface_connector::execute_transform;
13use worldinterface_connector::invoke_with_receipt;
14use worldinterface_connector::{
15    CancellationToken, ConnectorError, ConnectorRegistry, InvocationContext,
16};
17use worldinterface_contextstore::{AtomicWriter, ContextStore, ContextStoreError};
18use worldinterface_core::flowspec::branch::BranchNode;
19use worldinterface_core::flowspec::{ConnectorNode, NodeType, TransformNode};
20use worldinterface_core::id::NodeId;
21use worldinterface_core::metrics::MetricsRecorder;
22use worldinterface_flowspec::id::derive_step_run_id;
23use worldinterface_flowspec::payload::StepPayload;
24
25use crate::error::{ResolveError, StepError};
26use crate::resolve::resolve_params;
27
28/// Result of branch evaluation, stored in ContextStore.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct BranchResult {
31    /// True if the branch condition evaluated to true.
32    pub taken: bool,
33    /// The NodeId of the then-edge target.
34    pub then_target: NodeId,
35    /// The NodeId of the else-edge target (None if no else branch).
36    pub else_target: Option<NodeId>,
37}
38
39/// Execute a step, returning a HandlerOutput.
40///
41/// This is the core step execution logic, factored out of the handler for testability.
42pub fn execute_step<S: ContextStore>(
43    ctx: &ExecutorContext,
44    payload: &StepPayload,
45    registry: &ConnectorRegistry,
46    store: &Arc<S>,
47    metrics: &dyn MetricsRecorder,
48) -> HandlerOutput {
49    match step_inner(ctx, payload, registry, store, metrics) {
50        Ok(output) => output,
51        Err(e) => map_step_error(e),
52    }
53}
54
55fn step_inner<S: ContextStore>(
56    ctx: &ExecutorContext,
57    payload: &StepPayload,
58    registry: &ConnectorRegistry,
59    store: &Arc<S>,
60    metrics: &dyn MetricsRecorder,
61) -> Result<HandlerOutput, StepError> {
62    let step_type = match &payload.node_type {
63        NodeType::Connector(_) => "connector",
64        NodeType::Transform(_) => "transform",
65        NodeType::Branch(_) => "branch",
66    };
67    let _span = tracing::info_span!(
68        "step_run",
69        flow_run_id = %payload.flow_run_id,
70        node_id = %payload.node_id,
71        step_type = step_type,
72    )
73    .entered();
74
75    match &payload.node_type {
76        NodeType::Connector(connector_node) => {
77            execute_connector_step(ctx, payload, connector_node, registry, store, metrics)
78        }
79        NodeType::Transform(transform_node) => {
80            execute_transform_step(payload, transform_node, store, metrics)
81        }
82        NodeType::Branch(branch_node) => execute_branch_step(payload, branch_node, store, metrics),
83    }
84}
85
86/// Execute a connector step with crash recovery and receipt generation.
87fn execute_connector_step<S: ContextStore>(
88    ctx: &ExecutorContext,
89    payload: &StepPayload,
90    connector_node: &ConnectorNode,
91    registry: &ConnectorRegistry,
92    store: &Arc<S>,
93    metrics: &dyn MetricsRecorder,
94) -> Result<HandlerOutput, StepError> {
95    // Crash recovery: check if output already exists in ContextStore (Invariant 2 + 4)
96    if let Some(_existing) = store.get(payload.flow_run_id, payload.node_id)? {
97        tracing::info!(
98            %payload.flow_run_id, %payload.node_id,
99            "output already exists in ContextStore (idempotent retry), skipping connector invocation"
100        );
101        return Ok(HandlerOutput::success());
102    }
103
104    // Look up connector
105    let connector = registry
106        .get(&connector_node.connector)
107        .ok_or_else(|| StepError::ConnectorNotFound { name: connector_node.connector.clone() })?;
108
109    // Resolve parameters
110    let resolved_params = resolve_params(
111        &connector_node.params,
112        payload.flow_run_id,
113        payload.flow_params.as_ref(),
114        store.as_ref(),
115    )?;
116
117    // Build InvocationContext
118    let inv_ctx = build_invocation_context(ctx, payload);
119
120    // Record connector invocation metric
121    metrics.record_connector_invocation(&connector_node.connector);
122
123    // Invoke with receipt (Invariant 6)
124    let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, &resolved_params);
125
126    match result {
127        Ok(output) => {
128            // Write output to ContextStore via AtomicWriter (Invariant 2)
129            let writer = AtomicWriter::new(Arc::clone(store));
130            writer
131                .write_and_complete(payload.flow_run_id, payload.node_id, &output, || Ok(()))
132                .map_err(|e| match e {
133                    worldinterface_contextstore::AtomicWriteError::WriteFailed(store_err) => {
134                        StepError::ContextStoreError(store_err)
135                    }
136                    worldinterface_contextstore::AtomicWriteError::CompletionFailed(e) => {
137                        StepError::ContextStoreError(ContextStoreError::StorageError(e.to_string()))
138                    }
139                })?;
140
141            // Record ContextStore write metric
142            metrics.record_contextstore_write();
143
144            // Store receipt durably (Invariant 6)
145            let receipt_key = format!("receipt:step:{}:{}", payload.flow_run_id, payload.node_id);
146            if let Ok(receipt_value) = serde_json::to_value(&receipt) {
147                let _ = store.upsert_global(&receipt_key, &receipt_value);
148            }
149
150            // Record success metrics
151            metrics.record_step_completed(
152                &connector_node.connector,
153                receipt.duration_ms as f64 / 1000.0,
154            );
155
156            Ok(HandlerOutput::success())
157        }
158        Err(e) => {
159            // Store failure receipt durably (Invariant 6 — receipts even on failure)
160            // Best-effort: failure receipt storage should not mask the original error
161            let receipt_key = format!("receipt:step:{}:{}", payload.flow_run_id, payload.node_id);
162            if let Ok(receipt_value) = serde_json::to_value(&receipt) {
163                let _ = store.upsert_global(&receipt_key, &receipt_value);
164            }
165
166            // Record failure metrics
167            metrics.record_step_failed(&connector_node.connector);
168
169            Err(StepError::ConnectorError(e))
170        }
171    }
172}
173
174/// Execute a transform step.
175fn execute_transform_step<S: ContextStore>(
176    payload: &StepPayload,
177    transform_node: &TransformNode,
178    store: &Arc<S>,
179    metrics: &dyn MetricsRecorder,
180) -> Result<HandlerOutput, StepError> {
181    // Crash recovery: check if output already exists
182    if store.get(payload.flow_run_id, payload.node_id)?.is_some() {
183        tracing::info!(
184            %payload.flow_run_id, %payload.node_id,
185            "transform output already exists (idempotent retry), skipping"
186        );
187        return Ok(HandlerOutput::success());
188    }
189
190    // Resolve transform input (may contain template references)
191    let resolved_input = resolve_params(
192        &transform_node.input,
193        payload.flow_run_id,
194        payload.flow_params.as_ref(),
195        store.as_ref(),
196    )?;
197
198    // Execute the transform
199    let output = execute_transform(&transform_node.transform, &resolved_input)?;
200
201    // Write to ContextStore
202    store.put(payload.flow_run_id, payload.node_id, &output).or_else(|e| match e {
203        ContextStoreError::AlreadyExists { .. } => Ok(()),
204        other => Err(other),
205    })?;
206
207    // Record ContextStore write metric
208    metrics.record_contextstore_write();
209
210    Ok(HandlerOutput::success())
211}
212
213/// Execute a branch step.
214fn execute_branch_step<S: ContextStore>(
215    payload: &StepPayload,
216    branch_node: &BranchNode,
217    store: &Arc<S>,
218    metrics: &dyn MetricsRecorder,
219) -> Result<HandlerOutput, StepError> {
220    // Crash recovery: check if branch result already exists
221    if store.get(payload.flow_run_id, payload.node_id)?.is_some() {
222        tracing::info!(
223            %payload.flow_run_id, %payload.node_id,
224            "branch result already exists (idempotent retry), skipping"
225        );
226        return Ok(HandlerOutput::success());
227    }
228
229    // Evaluate the branch condition
230    let taken = crate::branch_eval::evaluate_branch(
231        &branch_node.condition,
232        payload.flow_run_id,
233        payload.flow_params.as_ref(),
234        store.as_ref(),
235    )?;
236
237    let branch_result = BranchResult {
238        taken,
239        then_target: branch_node.then_edge,
240        else_target: branch_node.else_edge,
241    };
242
243    // Write BranchResult to ContextStore
244    let value = serde_json::to_value(&branch_result).map_err(|e| {
245        StepError::ContextStoreError(ContextStoreError::StorageError(e.to_string()))
246    })?;
247
248    store.put(payload.flow_run_id, payload.node_id, &value).or_else(|e| match e {
249        ContextStoreError::AlreadyExists { .. } => Ok(()),
250        other => Err(other),
251    })?;
252
253    // Record ContextStore write metric
254    metrics.record_contextstore_write();
255
256    Ok(HandlerOutput::success())
257}
258
259/// Build an InvocationContext from AQ's ExecutorContext and the StepPayload.
260fn build_invocation_context(ctx: &ExecutorContext, payload: &StepPayload) -> InvocationContext {
261    let step_run_id = derive_step_run_id(payload.flow_run_id, payload.node_id);
262
263    // Bridge cancellation: share AQ's underlying AtomicBool with UI's token.
264    // AQ's watchdog thread or dispatch loop can cancel at any time; this
265    // gives UI connectors live visibility into that signal with zero overhead.
266    let cancellation =
267        CancellationToken::from_flag(ctx.input.cancellation_context.token().cancelled_flag());
268
269    InvocationContext {
270        flow_run_id: payload.flow_run_id,
271        node_id: payload.node_id,
272        step_run_id,
273        run_id: *ctx.input.run_id.as_uuid(),
274        attempt_id: *ctx.input.attempt_id.as_uuid(),
275        attempt_number: ctx.input.metadata.attempt_number,
276        cancellation,
277    }
278}
279
280/// Map StepError to HandlerOutput.
281fn map_step_error(err: StepError) -> HandlerOutput {
282    match &err {
283        StepError::ConnectorError(ConnectorError::Retryable(_)) => {
284            HandlerOutput::retryable_failure(err.to_string())
285        }
286        StepError::ConnectorError(ConnectorError::Cancelled) => {
287            HandlerOutput::retryable_failure(err.to_string())
288        }
289        StepError::ConnectorError(ConnectorError::Terminal(_))
290        | StepError::ConnectorError(ConnectorError::InvalidParams(_)) => {
291            HandlerOutput::terminal_failure(err.to_string())
292        }
293        StepError::ResolveFailed(ResolveError::NodeOutputNotFound { .. }) => {
294            HandlerOutput::retryable_failure(err.to_string())
295        }
296        StepError::ResolveFailed(ResolveError::ContextStoreError(
297            ContextStoreError::StorageError(_),
298        )) => HandlerOutput::retryable_failure(err.to_string()),
299        StepError::ContextStoreError(ContextStoreError::StorageError(_)) => {
300            HandlerOutput::retryable_failure(err.to_string())
301        }
302        _ => HandlerOutput::terminal_failure(err.to_string()),
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use actionqueue_core::ids::{AttemptId, RunId};
309    use actionqueue_core::task::safety::SafetyLevel;
310    use actionqueue_executor_local::handler::{AttemptMetadata, CancellationContext, HandlerInput};
311    use serde_json::json;
312    use worldinterface_connector::connectors::default_registry;
313    use worldinterface_contextstore::SqliteContextStore;
314    use worldinterface_core::flowspec::branch::{BranchCondition, ParamRef};
315    use worldinterface_core::flowspec::transform::TransformType;
316    use worldinterface_core::id::FlowRunId;
317    use worldinterface_core::metrics::NoopMetricsRecorder;
318
319    use super::*;
320
321    static NOOP_METRICS: NoopMetricsRecorder = NoopMetricsRecorder;
322
323    fn make_store() -> Arc<SqliteContextStore> {
324        Arc::new(SqliteContextStore::in_memory().unwrap())
325    }
326
327    fn make_executor_context() -> ExecutorContext {
328        ExecutorContext {
329            input: HandlerInput {
330                run_id: RunId::new(),
331                attempt_id: AttemptId::new(),
332                payload: vec![],
333                metadata: AttemptMetadata {
334                    max_attempts: 3,
335                    attempt_number: 1,
336                    timeout_secs: None,
337                    safety_level: SafetyLevel::Idempotent,
338                },
339                cancellation_context: CancellationContext::new(),
340            },
341            submission: None,
342            children: None,
343        }
344    }
345
346    fn make_step_payload(
347        flow_run_id: FlowRunId,
348        node_id: NodeId,
349        node_type: NodeType,
350    ) -> StepPayload {
351        StepPayload {
352            task_type: worldinterface_flowspec::payload::TaskType::Step,
353            flow_run_id,
354            node_id,
355            node_type,
356            flow_params: None,
357        }
358    }
359
360    // T-2: Step Handler — Connector Invocation
361
362    #[test]
363    fn step_invokes_delay_connector() {
364        let store = make_store();
365        let registry = default_registry();
366        let ctx = make_executor_context();
367        let fr = FlowRunId::new();
368        let node_id = NodeId::new();
369
370        let payload = make_step_payload(
371            fr,
372            node_id,
373            NodeType::Connector(ConnectorNode {
374                connector: "delay".into(),
375                params: json!({"duration_ms": 10}),
376                idempotency_config: None,
377            }),
378        );
379
380        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
381        assert!(matches!(result, HandlerOutput::Success { .. }));
382        assert!(store.get(fr, node_id).unwrap().is_some());
383    }
384
385    #[test]
386    fn step_invokes_fs_write_connector() {
387        let dir = tempfile::tempdir().unwrap();
388        let file_path = dir.path().join("test.txt");
389        let store = make_store();
390        let registry = default_registry();
391        let ctx = make_executor_context();
392        let fr = FlowRunId::new();
393        let node_id = NodeId::new();
394
395        let payload = make_step_payload(
396            fr,
397            node_id,
398            NodeType::Connector(ConnectorNode {
399                connector: "fs.write".into(),
400                params: json!({
401                    "path": file_path.to_str().unwrap(),
402                    "content": "hello world"
403                }),
404                idempotency_config: None,
405            }),
406        );
407
408        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
409        assert!(matches!(result, HandlerOutput::Success { .. }));
410        assert_eq!(std::fs::read_to_string(&file_path).unwrap(), "hello world");
411        assert!(store.get(fr, node_id).unwrap().is_some());
412    }
413
414    #[test]
415    fn step_connector_not_found() {
416        let store = make_store();
417        let registry = default_registry();
418        let ctx = make_executor_context();
419        let fr = FlowRunId::new();
420        let node_id = NodeId::new();
421
422        let payload = make_step_payload(
423            fr,
424            node_id,
425            NodeType::Connector(ConnectorNode {
426                connector: "nonexistent".into(),
427                params: json!({}),
428                idempotency_config: None,
429            }),
430        );
431
432        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
433        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
434    }
435
436    #[test]
437    fn step_connector_terminal_error() {
438        let dir = tempfile::tempdir().unwrap();
439        let file_path = dir.path().join("test.txt");
440        // Create file so "create" mode fails
441        std::fs::write(&file_path, "existing").unwrap();
442
443        let store = make_store();
444        let registry = default_registry();
445        let ctx = make_executor_context();
446        let fr = FlowRunId::new();
447        let node_id = NodeId::new();
448
449        let payload = make_step_payload(
450            fr,
451            node_id,
452            NodeType::Connector(ConnectorNode {
453                connector: "fs.write".into(),
454                params: json!({
455                    "path": file_path.to_str().unwrap(),
456                    "content": "new",
457                    "mode": "create"
458                }),
459                idempotency_config: None,
460            }),
461        );
462
463        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
464        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
465    }
466
467    #[test]
468    fn step_connector_invalid_params() {
469        let store = make_store();
470        let registry = default_registry();
471        let ctx = make_executor_context();
472        let fr = FlowRunId::new();
473        let node_id = NodeId::new();
474
475        let payload = make_step_payload(
476            fr,
477            node_id,
478            NodeType::Connector(ConnectorNode {
479                connector: "delay".into(),
480                params: json!({}), // missing duration_ms
481                idempotency_config: None,
482            }),
483        );
484
485        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
486        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
487    }
488
489    // T-3: Step Handler — Transform Execution
490
491    #[test]
492    fn step_executes_identity_transform() {
493        let store = make_store();
494        let registry = default_registry();
495        let ctx = make_executor_context();
496        let fr = FlowRunId::new();
497        let node_id = NodeId::new();
498
499        let input_val = json!({"key": "value", "num": 42});
500        let payload = make_step_payload(
501            fr,
502            node_id,
503            NodeType::Transform(TransformNode {
504                transform: TransformType::Identity,
505                input: input_val.clone(),
506            }),
507        );
508
509        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
510        assert!(matches!(result, HandlerOutput::Success { .. }));
511        let stored = store.get(fr, node_id).unwrap().unwrap();
512        assert_eq!(stored, input_val);
513    }
514
515    #[test]
516    fn step_executes_field_mapping() {
517        use worldinterface_core::flowspec::transform::{FieldMapping, FieldMappingSpec};
518
519        let store = make_store();
520        let registry = default_registry();
521        let ctx = make_executor_context();
522        let fr = FlowRunId::new();
523        let node_id = NodeId::new();
524
525        let payload = make_step_payload(
526            fr,
527            node_id,
528            NodeType::Transform(TransformNode {
529                transform: TransformType::FieldMapping(FieldMappingSpec {
530                    mappings: vec![FieldMapping { from: "a".into(), to: "b".into() }],
531                }),
532                input: json!({"a": 1}),
533            }),
534        );
535
536        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
537        assert!(matches!(result, HandlerOutput::Success { .. }));
538        let stored = store.get(fr, node_id).unwrap().unwrap();
539        assert_eq!(stored, json!({"b": 1}));
540    }
541
542    #[test]
543    fn step_transform_error_is_terminal() {
544        use worldinterface_core::flowspec::transform::{FieldMapping, FieldMappingSpec};
545
546        let store = make_store();
547        let registry = default_registry();
548        let ctx = make_executor_context();
549        let fr = FlowRunId::new();
550        let node_id = NodeId::new();
551
552        let payload = make_step_payload(
553            fr,
554            node_id,
555            NodeType::Transform(TransformNode {
556                transform: TransformType::FieldMapping(FieldMappingSpec {
557                    mappings: vec![FieldMapping { from: "nonexistent".into(), to: "out".into() }],
558                }),
559                input: json!({"a": 1}),
560            }),
561        );
562
563        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
564        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
565    }
566
567    // T-4: Step Handler — Branch Evaluation
568
569    #[test]
570    fn step_evaluates_exists_true() {
571        let store = make_store();
572        let registry = default_registry();
573        let ctx = make_executor_context();
574        let fr = FlowRunId::new();
575        let node_id = NodeId::new();
576        let then_target = NodeId::new();
577
578        let mut payload = make_step_payload(
579            fr,
580            node_id,
581            NodeType::Branch(BranchNode {
582                condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
583                then_edge: then_target,
584                else_edge: None,
585            }),
586        );
587        payload.flow_params = Some(json!({"flag": "present"}));
588
589        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
590        assert!(matches!(result, HandlerOutput::Success { .. }));
591
592        let stored = store.get(fr, node_id).unwrap().unwrap();
593        let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
594        assert!(branch_result.taken);
595        assert_eq!(branch_result.then_target, then_target);
596    }
597
598    #[test]
599    fn step_evaluates_exists_false() {
600        let store = make_store();
601        let registry = default_registry();
602        let ctx = make_executor_context();
603        let fr = FlowRunId::new();
604        let node_id = NodeId::new();
605        let then_target = NodeId::new();
606        let else_target = NodeId::new();
607
608        let mut payload = make_step_payload(
609            fr,
610            node_id,
611            NodeType::Branch(BranchNode {
612                condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
613                then_edge: then_target,
614                else_edge: Some(else_target),
615            }),
616        );
617        payload.flow_params = Some(json!({"flag": null}));
618
619        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
620        assert!(matches!(result, HandlerOutput::Success { .. }));
621
622        let stored = store.get(fr, node_id).unwrap().unwrap();
623        let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
624        assert!(!branch_result.taken);
625    }
626
627    #[test]
628    fn step_evaluates_equals_true() {
629        let store = make_store();
630        let registry = default_registry();
631        let ctx = make_executor_context();
632        let fr = FlowRunId::new();
633        let node_id = NodeId::new();
634        let then_target = NodeId::new();
635
636        let mut payload = make_step_payload(
637            fr,
638            node_id,
639            NodeType::Branch(BranchNode {
640                condition: BranchCondition::Equals {
641                    left: ParamRef::FlowParam { path: "status".into() },
642                    right: json!("ok"),
643                },
644                then_edge: then_target,
645                else_edge: None,
646            }),
647        );
648        payload.flow_params = Some(json!({"status": "ok"}));
649
650        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
651        assert!(matches!(result, HandlerOutput::Success { .. }));
652
653        let stored = store.get(fr, node_id).unwrap().unwrap();
654        let branch_result: BranchResult = serde_json::from_value(stored).unwrap();
655        assert!(branch_result.taken);
656    }
657
658    #[test]
659    fn step_expression_not_implemented() {
660        let store = make_store();
661        let registry = default_registry();
662        let ctx = make_executor_context();
663        let fr = FlowRunId::new();
664        let node_id = NodeId::new();
665        let then_target = NodeId::new();
666
667        let payload = make_step_payload(
668            fr,
669            node_id,
670            NodeType::Branch(BranchNode {
671                condition: BranchCondition::Expression("1 + 1".into()),
672                then_edge: then_target,
673                else_edge: None,
674            }),
675        );
676
677        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
678        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
679    }
680
681    // T-5: Step Handler — Crash Recovery
682
683    #[test]
684    fn step_skips_invocation_if_output_exists() {
685        let store = make_store();
686        let registry = default_registry();
687        let ctx = make_executor_context();
688        let fr = FlowRunId::new();
689        let node_id = NodeId::new();
690
691        // Pre-write output
692        store.put(fr, node_id, &json!({"pre": "existing"})).unwrap();
693
694        let payload = make_step_payload(
695            fr,
696            node_id,
697            NodeType::Connector(ConnectorNode {
698                connector: "delay".into(),
699                params: json!({"duration_ms": 10}),
700                idempotency_config: None,
701            }),
702        );
703
704        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
705        assert!(matches!(result, HandlerOutput::Success { .. }));
706        // Output should still be the pre-existing one
707        let stored = store.get(fr, node_id).unwrap().unwrap();
708        assert_eq!(stored, json!({"pre": "existing"}));
709    }
710
711    // T-11: InvocationContext Construction
712
713    #[test]
714    fn context_has_correct_ids() {
715        let ctx = make_executor_context();
716        let fr = FlowRunId::new();
717        let node_id = NodeId::new();
718        let payload = make_step_payload(
719            fr,
720            node_id,
721            NodeType::Connector(ConnectorNode {
722                connector: "delay".into(),
723                params: json!({"duration_ms": 0}),
724                idempotency_config: None,
725            }),
726        );
727
728        let inv_ctx = build_invocation_context(&ctx, &payload);
729        assert_eq!(inv_ctx.flow_run_id, fr);
730        assert_eq!(inv_ctx.node_id, node_id);
731        assert_eq!(inv_ctx.run_id, *ctx.input.run_id.as_uuid());
732        assert_eq!(inv_ctx.attempt_id, *ctx.input.attempt_id.as_uuid());
733        assert_eq!(inv_ctx.attempt_number, 1);
734    }
735
736    #[test]
737    fn cancellation_token_bridges_correctly() {
738        let ctx = {
739            let c = make_executor_context();
740            c.input.cancellation_context.cancel();
741            c
742        };
743
744        let payload = make_step_payload(
745            FlowRunId::new(),
746            NodeId::new(),
747            NodeType::Connector(ConnectorNode {
748                connector: "delay".into(),
749                params: json!({}),
750                idempotency_config: None,
751            }),
752        );
753
754        let inv_ctx = build_invocation_context(&ctx, &payload);
755        assert!(inv_ctx.cancellation.is_cancelled());
756    }
757
758    // T-10: Receipt Generation (Invariant 6)
759
760    #[test]
761    fn connector_step_produces_receipt() {
762        // invoke_with_receipt always produces a receipt, even on success.
763        // Verify by calling it directly with a real connector.
764        let registry = default_registry();
765        let connector = registry.get("delay").unwrap();
766        let inv_ctx = InvocationContext {
767            flow_run_id: FlowRunId::new(),
768            node_id: NodeId::new(),
769            step_run_id: worldinterface_flowspec::id::derive_step_run_id(
770                FlowRunId::new(),
771                NodeId::new(),
772            ),
773            run_id: uuid::Uuid::new_v4(),
774            attempt_id: uuid::Uuid::new_v4(),
775            attempt_number: 1,
776            cancellation: CancellationToken::new(),
777        };
778        let params = json!({"duration_ms": 10});
779        let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, &params);
780        assert!(result.is_ok());
781        assert_eq!(receipt.status, worldinterface_core::receipt::ReceiptStatus::Success);
782        assert_eq!(receipt.connector, "delay");
783    }
784
785    #[test]
786    fn connector_step_produces_receipt_on_failure() {
787        let registry = default_registry();
788        let connector = registry.get("delay").unwrap();
789        let inv_ctx = InvocationContext {
790            flow_run_id: FlowRunId::new(),
791            node_id: NodeId::new(),
792            step_run_id: worldinterface_flowspec::id::derive_step_run_id(
793                FlowRunId::new(),
794                NodeId::new(),
795            ),
796            run_id: uuid::Uuid::new_v4(),
797            attempt_id: uuid::Uuid::new_v4(),
798            attempt_number: 1,
799            cancellation: CancellationToken::new(),
800        };
801        let params = json!({}); // missing duration_ms → InvalidParams
802        let (result, receipt) = invoke_with_receipt(connector.as_ref(), &inv_ctx, &params);
803        assert!(result.is_err());
804        assert_eq!(receipt.status, worldinterface_core::receipt::ReceiptStatus::Failure);
805        assert_eq!(receipt.connector, "delay");
806        assert!(receipt.error.is_some());
807    }
808
809    #[test]
810    fn transform_step_does_not_produce_receipt() {
811        // Transform steps write to ContextStore but do NOT call invoke_with_receipt.
812        // Verify by executing a transform and confirming the output exists but
813        // no receipt-related code path is taken (no connector invocation).
814        let store = make_store();
815        let registry = default_registry();
816        let ctx = make_executor_context();
817        let fr = FlowRunId::new();
818        let node_id = NodeId::new();
819
820        let payload = make_step_payload(
821            fr,
822            node_id,
823            NodeType::Transform(TransformNode {
824                transform: TransformType::Identity,
825                input: json!({"key": "value"}),
826            }),
827        );
828
829        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
830        assert!(matches!(result, HandlerOutput::Success { .. }));
831        // Output is in ContextStore (transform ran), but no receipt was produced
832        // because the code path goes through execute_transform_step, not
833        // execute_connector_step (which is the only path that calls invoke_with_receipt).
834        assert!(store.get(fr, node_id).unwrap().is_some());
835    }
836
837    // T-4: Durable Receipt Storage (Sprint 8)
838
839    #[test]
840    fn connector_step_stores_receipt_on_success() {
841        let store = make_store();
842        let registry = default_registry();
843        let ctx = make_executor_context();
844        let fr = FlowRunId::new();
845        let node_id = NodeId::new();
846
847        let payload = make_step_payload(
848            fr,
849            node_id,
850            NodeType::Connector(ConnectorNode {
851                connector: "delay".into(),
852                params: json!({"duration_ms": 10}),
853                idempotency_config: None,
854            }),
855        );
856
857        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
858        assert!(matches!(result, HandlerOutput::Success { .. }));
859
860        // Receipt should be stored in ContextStore globals
861        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
862        let receipt = store.get_global(&receipt_key).unwrap();
863        assert!(receipt.is_some(), "receipt should be stored on success");
864        let receipt = receipt.unwrap();
865        assert_eq!(receipt.get("connector").and_then(|v| v.as_str()), Some("delay"));
866        assert_eq!(receipt.get("status").and_then(|v| v.as_str()), Some("success"));
867    }
868
869    #[test]
870    fn connector_step_stores_receipt_on_failure() {
871        let store = make_store();
872        let registry = default_registry();
873        let ctx = make_executor_context();
874        let fr = FlowRunId::new();
875        let node_id = NodeId::new();
876
877        let payload = make_step_payload(
878            fr,
879            node_id,
880            NodeType::Connector(ConnectorNode {
881                connector: "delay".into(),
882                params: json!({}), // missing duration_ms → InvalidParams → failure
883                idempotency_config: None,
884            }),
885        );
886
887        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
888        assert!(matches!(result, HandlerOutput::TerminalFailure { .. }));
889
890        // Failure receipt should still be stored (best-effort)
891        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
892        let receipt = store.get_global(&receipt_key).unwrap();
893        assert!(receipt.is_some(), "receipt should be stored even on failure");
894        let receipt = receipt.unwrap();
895        assert_eq!(receipt.get("status").and_then(|v| v.as_str()), Some("failure"));
896        assert!(receipt.get("error").is_some(), "failure receipt should have error field");
897    }
898
899    #[test]
900    fn receipt_has_correct_flow_run_id() {
901        let store = make_store();
902        let registry = default_registry();
903        let ctx = make_executor_context();
904        let fr = FlowRunId::new();
905        let node_id = NodeId::new();
906
907        let payload = make_step_payload(
908            fr,
909            node_id,
910            NodeType::Connector(ConnectorNode {
911                connector: "delay".into(),
912                params: json!({"duration_ms": 10}),
913                idempotency_config: None,
914            }),
915        );
916
917        execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
918
919        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
920        let receipt = store.get_global(&receipt_key).unwrap().unwrap();
921        assert_eq!(
922            receipt.get("flow_run_id").and_then(|v| v.as_str()),
923            Some(fr.to_string()).as_deref()
924        );
925    }
926
927    #[test]
928    fn receipt_has_nonzero_duration() {
929        let store = make_store();
930        let registry = default_registry();
931        let ctx = make_executor_context();
932        let fr = FlowRunId::new();
933        let node_id = NodeId::new();
934
935        let payload = make_step_payload(
936            fr,
937            node_id,
938            NodeType::Connector(ConnectorNode {
939                connector: "delay".into(),
940                params: json!({"duration_ms": 50}),
941                idempotency_config: None,
942            }),
943        );
944
945        execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
946
947        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
948        let receipt = store.get_global(&receipt_key).unwrap().unwrap();
949        let duration = receipt.get("duration_ms").and_then(|v| v.as_u64()).unwrap();
950        assert!(duration >= 50, "duration should be >= 50ms, got {}ms", duration);
951    }
952
953    #[test]
954    fn transform_step_does_not_store_receipt() {
955        let store = make_store();
956        let registry = default_registry();
957        let ctx = make_executor_context();
958        let fr = FlowRunId::new();
959        let node_id = NodeId::new();
960
961        let payload = make_step_payload(
962            fr,
963            node_id,
964            NodeType::Transform(TransformNode {
965                transform: TransformType::Identity,
966                input: json!({"key": "value"}),
967            }),
968        );
969
970        execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
971
972        // Transform steps should NOT store receipts (no boundary crossing)
973        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
974        let receipt = store.get_global(&receipt_key).unwrap();
975        assert!(receipt.is_none(), "transform steps should not produce receipts");
976    }
977
978    #[test]
979    fn branch_step_does_not_produce_receipt() {
980        let store = make_store();
981        let registry = default_registry();
982        let ctx = make_executor_context();
983        let fr = FlowRunId::new();
984        let node_id = NodeId::new();
985        let then_target = NodeId::new();
986
987        let mut payload = make_step_payload(
988            fr,
989            node_id,
990            NodeType::Branch(BranchNode {
991                condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
992                then_edge: then_target,
993                else_edge: None,
994            }),
995        );
996        payload.flow_params = Some(json!({"flag": true}));
997
998        let result = execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
999        assert!(matches!(result, HandlerOutput::Success { .. }));
1000        // BranchResult is in ContextStore, but no receipt was produced
1001        // because branch evaluation goes through execute_branch_step, not
1002        // execute_connector_step.
1003        assert!(store.get(fr, node_id).unwrap().is_some());
1004
1005        // Verify no receipt key exists (branch steps are not boundary crossings)
1006        let receipt_key = format!("receipt:step:{}:{}", fr, node_id);
1007        assert!(
1008            store.get_global(&receipt_key).unwrap().is_none(),
1009            "branch steps should not produce receipts"
1010        );
1011    }
1012
1013    // T-10: Tracing Span Assertions (Sprint 8)
1014
1015    use std::io;
1016    use std::sync::Mutex;
1017
1018    use tracing_subscriber::fmt;
1019    use tracing_subscriber::prelude::*;
1020
1021    /// Shared buffer writer for capturing tracing output in tests.
1022    struct BufWriter(Arc<Mutex<Vec<u8>>>);
1023
1024    impl io::Write for BufWriter {
1025        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1026            self.0.lock().unwrap().extend_from_slice(buf);
1027            Ok(buf.len())
1028        }
1029        fn flush(&mut self) -> io::Result<()> {
1030            Ok(())
1031        }
1032    }
1033
1034    impl Clone for BufWriter {
1035        fn clone(&self) -> Self {
1036            BufWriter(Arc::clone(&self.0))
1037        }
1038    }
1039
1040    impl<'a> fmt::MakeWriter<'a> for BufWriter {
1041        type Writer = BufWriter;
1042        fn make_writer(&'a self) -> Self::Writer {
1043            self.clone()
1044        }
1045    }
1046
1047    /// Run a closure with a tracing subscriber that captures output, return the captured text.
1048    fn capture_tracing<F: FnOnce()>(f: F) -> String {
1049        let buf = Arc::new(Mutex::new(Vec::new()));
1050        let writer = BufWriter(Arc::clone(&buf));
1051        let subscriber = tracing_subscriber::registry().with(
1052            fmt::layer()
1053                .with_writer(writer)
1054                .with_ansi(false)
1055                .with_target(false)
1056                .with_level(true)
1057                .with_span_events(fmt::format::FmtSpan::NEW | fmt::format::FmtSpan::CLOSE),
1058        );
1059        tracing::subscriber::with_default(subscriber, f);
1060        let bytes = buf.lock().unwrap().clone();
1061        String::from_utf8(bytes).unwrap()
1062    }
1063
1064    #[test]
1065    fn step_run_span_has_flow_run_id_and_node_id() {
1066        let store = make_store();
1067        let registry = default_registry();
1068        let ctx = make_executor_context();
1069        let fr = FlowRunId::new();
1070        let node_id = NodeId::new();
1071
1072        let payload = make_step_payload(
1073            fr,
1074            node_id,
1075            NodeType::Connector(ConnectorNode {
1076                connector: "delay".into(),
1077                params: json!({"duration_ms": 10}),
1078                idempotency_config: None,
1079            }),
1080        );
1081
1082        let output = capture_tracing(|| {
1083            execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
1084        });
1085
1086        assert!(
1087            output.contains(&fr.to_string()),
1088            "tracing output should contain flow_run_id {}, got:\n{}",
1089            fr,
1090            output
1091        );
1092        assert!(
1093            output.contains(&node_id.to_string()),
1094            "tracing output should contain node_id {}, got:\n{}",
1095            node_id,
1096            output
1097        );
1098        assert!(
1099            output.contains("step_run"),
1100            "tracing output should contain step_run span name, got:\n{}",
1101            output
1102        );
1103    }
1104
1105    #[test]
1106    fn step_run_span_has_step_type() {
1107        let store = make_store();
1108        let registry = default_registry();
1109        let ctx = make_executor_context();
1110        let fr = FlowRunId::new();
1111        let node_id = NodeId::new();
1112
1113        let payload = make_step_payload(
1114            fr,
1115            node_id,
1116            NodeType::Transform(TransformNode {
1117                transform: TransformType::Identity,
1118                input: json!({"x": 1}),
1119            }),
1120        );
1121
1122        let output = capture_tracing(|| {
1123            execute_step(&ctx, &payload, &registry, &store, &NOOP_METRICS);
1124        });
1125
1126        assert!(
1127            output.contains("step_type"),
1128            "tracing output should contain step_type field, got:\n{}",
1129            output
1130        );
1131        assert!(
1132            output.contains("transform"),
1133            "tracing output should contain step_type=transform, got:\n{}",
1134            output
1135        );
1136    }
1137}