Skip to main content

pureflow_wasm/
lib.rs

1//! Wasmtime-backed batch adapter boundary for Pureflow.
2//!
3//! The crate owns the Component Model/WIT ABI and keeps Wasmtime types out of
4//! `pureflow-core`. Guest components implement `pureflow:batch/pureflow-node`
5//! from `wit/pureflow-batch.wit`; the host remains responsible for output port
6//! validation before packets are sent through `PortsOut`.
7
8use std::{
9    num::NonZeroU32,
10    sync::{
11        Arc,
12        atomic::{AtomicBool, Ordering},
13    },
14    thread::{self, JoinHandle},
15    time::Duration,
16};
17
18use pureflow_core::{
19    BatchExecutor, BatchInputs, BatchOutputs, PureflowError, PacketPayload, PortPacket, Result,
20    capability::{CapabilityValidationError, NodeCapabilities},
21    context::{CancellationRequest, CancellationToken, ExecutionAttempt, ExecutionMetadata},
22    message::{MessageEndpoint, MessageMetadata, MessageRoute},
23};
24use pureflow_types::{ExecutionId, MessageId, NodeId, PortId, WorkflowId};
25use serde_json::Value;
26use wasmtime::{
27    Config, Engine, Store,
28    component::{Component, ComponentExportIndex, Func, Instance, Linker, Val},
29};
30
31/// WIT package identifier implemented by Pureflow WASM batch guests.
32pub const WIT_PACKAGE: &str = "pureflow:batch@0.1.0";
33
34/// WIT world exported by Pureflow WASM batch guests.
35pub const WIT_WORLD: &str = "pureflow-node";
36
37const DEFAULT_GUEST_FUEL: u64 = 100_000_000;
38const DEFAULT_CANCELLATION_EPOCH_DEADLINE: u64 = 1;
39const DEFAULT_CANCELLATION_POLL_INTERVAL: Duration = Duration::from_millis(1);
40
41/// Execution limits applied to each Wasmtime guest invocation.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub struct WasmtimeExecutionLimits {
44    fuel: u64,
45    cancellation_epoch_deadline: u64,
46    cancellation_poll_interval: Duration,
47}
48
49impl WasmtimeExecutionLimits {
50    /// Create limits with a fuel budget per invocation.
51    #[must_use]
52    pub const fn new(fuel: u64) -> Self {
53        Self {
54            fuel,
55            cancellation_epoch_deadline: DEFAULT_CANCELLATION_EPOCH_DEADLINE,
56            cancellation_poll_interval: DEFAULT_CANCELLATION_POLL_INTERVAL,
57        }
58    }
59
60    /// Fuel units available to one guest invocation.
61    #[must_use]
62    pub const fn fuel(&self) -> u64 {
63        self.fuel
64    }
65
66    /// Epoch ticks after which a cancellation increment interrupts the store.
67    #[must_use]
68    pub const fn cancellation_epoch_deadline(&self) -> u64 {
69        self.cancellation_epoch_deadline
70    }
71
72    /// Poll interval used by the synchronous cancellation watcher.
73    #[must_use]
74    pub const fn cancellation_poll_interval(&self) -> Duration {
75        self.cancellation_poll_interval
76    }
77
78    /// Return limits with a different epoch deadline.
79    #[must_use]
80    pub const fn with_cancellation_epoch_deadline(mut self, ticks: u64) -> Self {
81        self.cancellation_epoch_deadline = ticks;
82        self
83    }
84
85    /// Return limits with a different cancellation poll interval.
86    #[must_use]
87    pub const fn with_cancellation_poll_interval(mut self, interval: Duration) -> Self {
88        self.cancellation_poll_interval = interval;
89        self
90    }
91}
92
93impl Default for WasmtimeExecutionLimits {
94    fn default() -> Self {
95        Self::new(DEFAULT_GUEST_FUEL)
96    }
97}
98
99/// Wasmtime component prepared for Pureflow batch execution.
100pub struct WasmtimeBatchComponent {
101    engine: Engine,
102    component: Component,
103    limits: WasmtimeExecutionLimits,
104}
105
106impl WasmtimeBatchComponent {
107    /// Compile a guest component from bytes.
108    ///
109    /// # Errors
110    ///
111    /// Returns an error if Wasmtime cannot configure the engine or compile the
112    /// supplied component bytes.
113    pub fn from_component_bytes(bytes: impl AsRef<[u8]>) -> Result<Self> {
114        Self::from_component_bytes_with_limits(bytes, WasmtimeExecutionLimits::default())
115    }
116
117    /// Compile a guest component from bytes with explicit execution limits.
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if Wasmtime cannot configure the engine or compile the
122    /// supplied component bytes.
123    pub fn from_component_bytes_with_limits(
124        bytes: impl AsRef<[u8]>,
125        limits: WasmtimeExecutionLimits,
126    ) -> Result<Self> {
127        let engine: Engine = component_engine()?;
128        let component: Component =
129            Component::from_binary(&engine, bytes.as_ref()).map_err(|err: wasmtime::Error| {
130                PureflowError::execution(format!("failed to compile component: {err}"))
131            })?;
132
133        Ok(Self {
134            engine,
135            component,
136            limits,
137        })
138    }
139
140    /// Compile a guest component after validating the WASM capability boundary.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the capability descriptor declares effects that the
145    /// current import-free WASM world cannot enforce, or if component
146    /// compilation fails.
147    pub fn from_component_bytes_with_capabilities(
148        bytes: impl AsRef<[u8]>,
149        capabilities: &NodeCapabilities,
150    ) -> Result<Self> {
151        Self::from_component_bytes_with_capabilities_and_limits(
152            bytes,
153            capabilities,
154            WasmtimeExecutionLimits::default(),
155        )
156    }
157
158    /// Compile a guest component after validating the WASM capability boundary
159    /// and applying explicit execution limits.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the capability descriptor declares effects that the
164    /// current import-free WASM world cannot enforce, or if component
165    /// compilation fails.
166    pub fn from_component_bytes_with_capabilities_and_limits(
167        bytes: impl AsRef<[u8]>,
168        capabilities: &NodeCapabilities,
169        limits: WasmtimeExecutionLimits,
170    ) -> Result<Self> {
171        validate_wasm_capabilities(capabilities)?;
172        Self::from_component_bytes_with_limits(bytes, limits)
173    }
174
175    /// Execution limits used for each guest invocation.
176    #[must_use]
177    pub const fn limits(&self) -> WasmtimeExecutionLimits {
178        self.limits
179    }
180
181    /// Instantiate and invoke the guest component with one batch.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if the component cannot instantiate, the guest traps,
186    /// or the guest returns malformed Pureflow data.
187    pub fn invoke(&self, inputs: &BatchInputs) -> Result<BatchOutputs> {
188        self.invoke_with_cancellation(inputs, &CancellationToken::active())
189    }
190
191    /// Instantiate and invoke the guest component with one batch, interrupting
192    /// Wasmtime execution if cancellation is requested while the synchronous
193    /// guest call is in progress.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if cancellation is already requested, the component
198    /// cannot instantiate, the guest traps or exceeds its fuel budget, or the
199    /// guest returns malformed Pureflow data.
200    pub fn invoke_with_cancellation(
201        &self,
202        inputs: &BatchInputs,
203        cancellation: &CancellationToken,
204    ) -> Result<BatchOutputs> {
205        if let Some(request) = cancellation.request() {
206            return Err(PureflowError::cancelled(request.reason()));
207        }
208
209        let linker: Linker<()> = Linker::new(&self.engine);
210        let mut store: Store<()> = Store::new(&self.engine, ());
211        store.set_epoch_deadline(self.limits.cancellation_epoch_deadline());
212        store
213            .set_fuel(self.limits.fuel())
214            .map_err(|err: wasmtime::Error| {
215                PureflowError::execution(format!("failed to configure guest fuel: {err}"))
216            })?;
217        let watcher: CancellationWatcher = CancellationWatcher::spawn(
218            self.engine.clone(),
219            cancellation.clone(),
220            self.limits.cancellation_poll_interval(),
221        )?;
222        let instance: Instance =
223            linker
224                .instantiate(&mut store, &self.component)
225                .map_err(|err: wasmtime::Error| {
226                    PureflowError::execution(format!("failed to instantiate component: {err}"))
227                })?;
228        let batch_index: ComponentExportIndex = instance
229            .get_export_index(&mut store, None, "pureflow:batch/batch@0.1.0")
230            .ok_or_else(|| {
231                PureflowError::execution("component does not export pureflow:batch/batch@0.1.0")
232            })?;
233        let invoke_index: ComponentExportIndex = instance
234            .get_export_index(&mut store, Some(&batch_index), "invoke")
235            .ok_or_else(|| PureflowError::execution("component does not export batch.invoke"))?;
236        let invoke: Func = instance
237            .get_func(&mut store, invoke_index)
238            .ok_or_else(|| PureflowError::execution("batch.invoke export is not a function"))?;
239
240        let params: [Val; 1] = [batch_inputs_to_val(inputs)?];
241        let mut results: [Val; 1] = [Val::Bool(false)];
242        let call_result: std::result::Result<(), wasmtime::Error> =
243            invoke.call(&mut store, &params, &mut results);
244        let interrupted: bool = watcher.finish();
245        if interrupted {
246            let reason: String = cancellation.request().map_or_else(
247                || String::from("wasm guest invocation cancelled"),
248                |request: CancellationRequest| request.reason().to_owned(),
249            );
250            return Err(PureflowError::cancelled(reason));
251        }
252        let remaining_fuel: Option<u64> = store.get_fuel().ok();
253        call_result.map_err(|err: wasmtime::Error| {
254            map_guest_call_error(&err, self.limits, remaining_fuel)
255        })?;
256
257        let [result]: [Val; 1] = results;
258        batch_outputs_from_result_val(result)
259    }
260}
261
262impl BatchExecutor for WasmtimeBatchComponent {
263    fn invoke(&self, inputs: BatchInputs) -> Result<BatchOutputs> {
264        Self::invoke(self, &inputs)
265    }
266}
267
268/// Validate a capability descriptor for the current import-free WASM world.
269///
270/// # Errors
271///
272/// Returns an error if the descriptor declares any external effect capability.
273pub fn validate_wasm_capabilities(capabilities: &NodeCapabilities) -> Result<()> {
274    if let Some(effect) = capabilities.effects().first() {
275        return Err(CapabilityValidationError::UnenforceableEffectCapability {
276            node_id: capabilities.node_id().clone(),
277            effect: *effect,
278        }
279        .into());
280    }
281
282    Ok(())
283}
284
285/// Convert Pureflow batch inputs to the WIT-facing ordered port batch shape.
286///
287/// # Errors
288///
289/// Returns an error if a payload cannot be represented by WIT ABI `0.1.0`.
290pub fn to_wit_port_batches(inputs: &BatchInputs) -> Result<Vec<WitPortBatch>> {
291    inputs
292        .packets_by_port()
293        .iter()
294        .map(|(port_id, packets): (&PortId, &Vec<PortPacket>)| {
295            Ok(WitPortBatch {
296                port_id: port_id.to_string(),
297                packets: packets
298                    .iter()
299                    .map(to_wit_packet)
300                    .collect::<Result<Vec<_>>>()?,
301            })
302        })
303        .collect()
304}
305
306/// Convert WIT-facing ordered port batches back to Pureflow batch outputs.
307///
308/// # Errors
309///
310/// Returns an error if a port identifier or packet metadata identifier fails
311/// Pureflow validation, or if a control payload is not valid JSON.
312pub fn from_wit_port_batches(port_batches: Vec<WitPortBatch>) -> Result<BatchOutputs> {
313    let mut outputs: BatchOutputs = BatchOutputs::new();
314    for port_batch in port_batches {
315        let port_id: PortId = PortId::new(port_batch.port_id)?;
316        for packet in port_batch.packets {
317            outputs.push(port_id.clone(), from_wit_packet(packet)?);
318        }
319    }
320
321    Ok(outputs)
322}
323
324/// WIT-facing port batch representation.
325#[derive(Debug, Clone, PartialEq, Eq)]
326pub struct WitPortBatch {
327    /// Port identifier.
328    pub port_id: String,
329    /// Packets for the port, preserving batch order.
330    pub packets: Vec<WitPacket>,
331}
332
333/// WIT-facing packet representation.
334#[derive(Debug, Clone, PartialEq, Eq)]
335pub struct WitPacket {
336    /// Message metadata.
337    pub metadata: pureflow_core::message::MessageMetadata,
338    /// Packet payload.
339    pub payload: WitPayload,
340}
341
342/// WIT-facing packet payload representation.
343#[derive(Debug, Clone, PartialEq, Eq)]
344pub enum WitPayload {
345    /// Byte payload.
346    Bytes(Vec<u8>),
347    /// JSON-encoded control payload.
348    Control(String),
349}
350
351fn component_engine() -> Result<Engine> {
352    let mut config: Config = Config::new();
353    config.wasm_component_model(true);
354    config.epoch_interruption(true);
355    config.consume_fuel(true);
356    Engine::new(&config).map_err(|err: wasmtime::Error| {
357        PureflowError::execution(format!("failed to create Wasmtime engine: {err}"))
358    })
359}
360
361struct CancellationWatcher {
362    complete: Arc<AtomicBool>,
363    interrupted: Arc<AtomicBool>,
364    thread: Option<JoinHandle<()>>,
365}
366
367impl CancellationWatcher {
368    fn spawn(engine: Engine, cancellation: CancellationToken, interval: Duration) -> Result<Self> {
369        let complete: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
370        let interrupted: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
371        let thread_complete: Arc<AtomicBool> = Arc::clone(&complete);
372        let thread_interrupted: Arc<AtomicBool> = Arc::clone(&interrupted);
373        let thread: JoinHandle<()> = thread::Builder::new()
374            .name(String::from("pureflow-wasm-cancellation"))
375            .spawn(move || {
376                while !thread_complete.load(Ordering::Acquire) {
377                    if cancellation.is_cancelled() {
378                        thread_interrupted.store(true, Ordering::Release);
379                        engine.increment_epoch();
380                        break;
381                    }
382                    thread::sleep(interval);
383                }
384            })
385            .map_err(|err: std::io::Error| {
386                PureflowError::execution(format!("failed to start WASM cancellation watcher: {err}"))
387            })?;
388
389        Ok(Self {
390            complete,
391            interrupted,
392            thread: Some(thread),
393        })
394    }
395
396    fn finish(mut self) -> bool {
397        self.complete.store(true, Ordering::Release);
398        if let Some(thread) = self.thread.take() {
399            let _ = thread.join();
400        }
401        self.interrupted.load(Ordering::Acquire)
402    }
403}
404
405impl Drop for CancellationWatcher {
406    fn drop(&mut self) {
407        self.complete.store(true, Ordering::Release);
408        if let Some(thread) = self.thread.take() {
409            let _ = thread.join();
410        }
411    }
412}
413
414fn map_guest_call_error(
415    err: &wasmtime::Error,
416    limits: WasmtimeExecutionLimits,
417    remaining_fuel: Option<u64>,
418) -> PureflowError {
419    let message: String = err.to_string();
420    if remaining_fuel == Some(0) || message.to_ascii_lowercase().contains("fuel") {
421        PureflowError::execution(format!(
422            "guest exceeded Wasmtime fuel limit of {} units",
423            limits.fuel()
424        ))
425    } else {
426        PureflowError::execution(format!("guest invoke failed: {err}"))
427    }
428}
429
430fn batch_inputs_to_val(inputs: &BatchInputs) -> Result<Val> {
431    Ok(Val::List(
432        to_wit_port_batches(inputs)?
433            .into_iter()
434            .map(port_batch_to_val)
435            .collect(),
436    ))
437}
438
439fn port_batch_to_val(port_batch: WitPortBatch) -> Val {
440    Val::Record(vec![
441        ("port-id".to_owned(), Val::String(port_batch.port_id)),
442        (
443            "packets".to_owned(),
444            Val::List(port_batch.packets.into_iter().map(packet_to_val).collect()),
445        ),
446    ])
447}
448
449fn packet_to_val(packet: WitPacket) -> Val {
450    Val::Record(vec![
451        ("metadata".to_owned(), metadata_to_val(&packet.metadata)),
452        ("payload".to_owned(), payload_to_val(packet.payload)),
453    ])
454}
455
456fn metadata_to_val(metadata: &MessageMetadata) -> Val {
457    Val::Record(vec![
458        (
459            "message-id".to_owned(),
460            Val::String(metadata.message_id().to_string()),
461        ),
462        (
463            "workflow-id".to_owned(),
464            Val::String(metadata.workflow_id().to_string()),
465        ),
466        (
467            "execution".to_owned(),
468            Val::Record(vec![
469                (
470                    "execution-id".to_owned(),
471                    Val::String(metadata.execution().execution_id().to_string()),
472                ),
473                (
474                    "attempt".to_owned(),
475                    Val::U32(metadata.execution().attempt().get()),
476                ),
477            ]),
478        ),
479        ("route".to_owned(), route_to_val(metadata.route())),
480    ])
481}
482
483fn route_to_val(route: &MessageRoute) -> Val {
484    Val::Record(vec![
485        (
486            "source".to_owned(),
487            Val::Option(
488                route
489                    .source()
490                    .map(|source: &MessageEndpoint| Box::new(endpoint_to_val(source))),
491            ),
492        ),
493        ("target".to_owned(), endpoint_to_val(route.target())),
494    ])
495}
496
497fn endpoint_to_val(endpoint: &MessageEndpoint) -> Val {
498    Val::Record(vec![
499        (
500            "node-id".to_owned(),
501            Val::String(endpoint.node_id().to_string()),
502        ),
503        (
504            "port-id".to_owned(),
505            Val::String(endpoint.port_id().to_string()),
506        ),
507    ])
508}
509
510fn payload_to_val(payload: WitPayload) -> Val {
511    match payload {
512        WitPayload::Bytes(bytes) => {
513            Val::Variant("bytes".to_owned(), Some(Box::new(bytes_to_list_val(bytes))))
514        }
515        WitPayload::Control(value) => {
516            Val::Variant("control".to_owned(), Some(Box::new(Val::String(value))))
517        }
518    }
519}
520
521fn bytes_to_list_val(bytes: Vec<u8>) -> Val {
522    Val::List(bytes.into_iter().map(Val::U8).collect())
523}
524
525fn batch_outputs_from_result_val(value: Val) -> Result<BatchOutputs> {
526    let result: std::result::Result<Option<Box<Val>>, Option<Box<Val>>> = match value {
527        Val::Result(result) => result,
528        _ => {
529            return Err(PureflowError::execution(
530                "guest returned non-result from batch.invoke",
531            ));
532        }
533    };
534
535    match result {
536        Ok(Some(value)) => port_batches_from_val(*value).and_then(from_wit_port_batches),
537        Ok(None) => Err(PureflowError::execution(
538            "guest returned empty ok result from batch.invoke",
539        )),
540        Err(Some(value)) => Err(batch_error_from_val(*value)),
541        Err(None) => Err(PureflowError::execution(
542            "guest returned empty error from batch.invoke",
543        )),
544    }
545}
546
547fn batch_error_from_val(value: Val) -> PureflowError {
548    match value {
549        Val::Variant(name, Some(detail)) => match *detail {
550            Val::String(message) => {
551                PureflowError::execution(format!("guest returned {name}: {message}"))
552            }
553            _ => PureflowError::execution(format!("guest returned malformed {name} error")),
554        },
555        Val::Variant(name, None) => {
556            PureflowError::execution(format!("guest returned {name} without detail"))
557        }
558        _ => PureflowError::execution("guest returned malformed batch error"),
559    }
560}
561
562fn port_batches_from_val(value: Val) -> Result<Vec<WitPortBatch>> {
563    let values: Vec<Val> = match value {
564        Val::List(values) => values,
565        _ => {
566            return Err(PureflowError::execution(
567                "guest returned non-list batch output",
568            ));
569        }
570    };
571
572    values.into_iter().map(port_batch_from_val).collect()
573}
574
575fn port_batch_from_val(value: Val) -> Result<WitPortBatch> {
576    let fields: Vec<(String, Val)> = record_fields(value, "port batch")?;
577    let port_id: String = required_string_field(&fields, "port-id", "port batch")?;
578    let packets: Vec<WitPacket> = required_list_field(&fields, "packets", "port batch")?
579        .into_iter()
580        .map(packet_from_val)
581        .collect::<Result<Vec<_>>>()?;
582
583    Ok(WitPortBatch { port_id, packets })
584}
585
586fn packet_from_val(value: Val) -> Result<WitPacket> {
587    let fields: Vec<(String, Val)> = record_fields(value, "packet")?;
588    let metadata: MessageMetadata =
589        metadata_from_val(required_field(&fields, "metadata", "packet")?.clone())?;
590    let payload: WitPayload =
591        payload_from_val(required_field(&fields, "payload", "packet")?.clone())?;
592
593    Ok(WitPacket { metadata, payload })
594}
595
596fn metadata_from_val(value: Val) -> Result<MessageMetadata> {
597    let fields: Vec<(String, Val)> = record_fields(value, "message metadata")?;
598    let message_id: MessageId = MessageId::new(required_string_field(
599        &fields,
600        "message-id",
601        "message metadata",
602    )?)?;
603    let workflow_id: WorkflowId = WorkflowId::new(required_string_field(
604        &fields,
605        "workflow-id",
606        "message metadata",
607    )?)?;
608    let execution: ExecutionMetadata =
609        execution_from_val(required_field(&fields, "execution", "message metadata")?.clone())?;
610    let route: MessageRoute =
611        route_from_val(required_field(&fields, "route", "message metadata")?.clone())?;
612
613    Ok(MessageMetadata::new(
614        message_id,
615        workflow_id,
616        execution,
617        route,
618    ))
619}
620
621fn execution_from_val(value: Val) -> Result<ExecutionMetadata> {
622    let fields: Vec<(String, Val)> = record_fields(value, "execution metadata")?;
623    let execution_id: ExecutionId = ExecutionId::new(required_string_field(
624        &fields,
625        "execution-id",
626        "execution metadata",
627    )?)?;
628    let attempt: u32 = required_u32_field(&fields, "attempt", "execution metadata")?;
629    let attempt: ExecutionAttempt = NonZeroU32::new(attempt)
630        .map(ExecutionAttempt::new)
631        .ok_or_else(|| PureflowError::execution("guest returned zero execution attempt"))?;
632
633    Ok(ExecutionMetadata::new(execution_id, attempt))
634}
635
636fn route_from_val(value: Val) -> Result<MessageRoute> {
637    let fields: Vec<(String, Val)> = record_fields(value, "message route")?;
638    let source: Option<MessageEndpoint> = match required_field(&fields, "source", "message route")?
639    {
640        Val::Option(Some(source)) => Some(endpoint_from_val(source.as_ref().clone())?),
641        Val::Option(None) => None,
642        _ => {
643            return Err(PureflowError::execution(
644                "guest returned non-option route source",
645            ));
646        }
647    };
648    let target: MessageEndpoint =
649        endpoint_from_val(required_field(&fields, "target", "message route")?.clone())?;
650
651    Ok(MessageRoute::new(source, target))
652}
653
654fn endpoint_from_val(value: Val) -> Result<MessageEndpoint> {
655    let fields: Vec<(String, Val)> = record_fields(value, "message endpoint")?;
656    let node_id: NodeId = NodeId::new(required_string_field(
657        &fields,
658        "node-id",
659        "message endpoint",
660    )?)?;
661    let port_id: PortId = PortId::new(required_string_field(
662        &fields,
663        "port-id",
664        "message endpoint",
665    )?)?;
666
667    Ok(MessageEndpoint::new(node_id, port_id))
668}
669
670fn payload_from_val(value: Val) -> Result<WitPayload> {
671    let (name, payload): (String, Option<Box<Val>>) = match value {
672        Val::Variant(name, payload) => (name, payload),
673        _ => {
674            return Err(PureflowError::execution(
675                "guest returned non-variant payload",
676            ));
677        }
678    };
679    match (name.as_str(), payload) {
680        ("bytes", Some(value)) => Ok(WitPayload::Bytes(bytes_from_val(*value)?)),
681        ("control", Some(value)) => {
682            let value: String = match *value {
683                Val::String(value) => value,
684                _ => {
685                    return Err(PureflowError::execution(
686                        "guest returned non-string control payload",
687                    ));
688                }
689            };
690            Ok(WitPayload::Control(value))
691        }
692        (kind, _) => Err(PureflowError::execution(format!(
693            "guest returned unsupported payload variant: {kind}"
694        ))),
695    }
696}
697
698fn bytes_from_val(value: Val) -> Result<Vec<u8>> {
699    let values: Vec<Val> = match value {
700        Val::List(values) => values,
701        _ => {
702            return Err(PureflowError::execution(
703                "guest returned non-list bytes payload",
704            ));
705        }
706    };
707    values
708        .into_iter()
709        .map(|value: Val| match value {
710            Val::U8(byte) => Ok(byte),
711            _ => Err(PureflowError::execution(
712                "guest returned non-u8 byte payload element",
713            )),
714        })
715        .collect()
716}
717
718fn record_fields(value: Val, context: &str) -> Result<Vec<(String, Val)>> {
719    let fields: Vec<(String, Val)> = match value {
720        Val::Record(fields) => fields,
721        _ => {
722            return Err(PureflowError::execution(format!(
723                "guest returned non-record {context}"
724            )));
725        }
726    };
727    Ok(fields)
728}
729
730fn required_field<'a>(fields: &'a [(String, Val)], name: &str, context: &str) -> Result<&'a Val> {
731    fields
732        .iter()
733        .find_map(|(field_name, value): &(String, Val)| (field_name == name).then_some(value))
734        .ok_or_else(|| PureflowError::execution(format!("guest omitted {context} field {name}")))
735}
736
737fn required_string_field(fields: &[(String, Val)], name: &str, context: &str) -> Result<String> {
738    match required_field(fields, name, context)? {
739        Val::String(value) => Ok(value.clone()),
740        _ => Err(PureflowError::execution(format!(
741            "guest returned non-string {context} field {name}"
742        ))),
743    }
744}
745
746fn required_u32_field(fields: &[(String, Val)], name: &str, context: &str) -> Result<u32> {
747    match required_field(fields, name, context)? {
748        Val::U32(value) => Ok(*value),
749        _ => Err(PureflowError::execution(format!(
750            "guest returned non-u32 {context} field {name}"
751        ))),
752    }
753}
754
755fn required_list_field(fields: &[(String, Val)], name: &str, context: &str) -> Result<Vec<Val>> {
756    match required_field(fields, name, context)? {
757        Val::List(values) => Ok(values.clone()),
758        _ => Err(PureflowError::execution(format!(
759            "guest returned non-list {context} field {name}"
760        ))),
761    }
762}
763
764#[allow(clippy::match_wildcard_for_single_variants)]
765fn to_wit_packet(packet: &PortPacket) -> Result<WitPacket> {
766    let payload: WitPayload = match packet.payload() {
767        PacketPayload::Bytes(bytes) => WitPayload::Bytes(bytes.to_vec()),
768        PacketPayload::Control(value) => WitPayload::Control(value.to_string()),
769        #[allow(unreachable_patterns)]
770        _ => {
771            return Err(PureflowError::execution(
772                "payload is not supported by WIT ABI 0.1.0",
773            ));
774        }
775    };
776
777    Ok(WitPacket {
778        metadata: packet.metadata().clone(),
779        payload,
780    })
781}
782
783fn from_wit_packet(packet: WitPacket) -> Result<PortPacket> {
784    let payload: PacketPayload = match packet.payload {
785        WitPayload::Bytes(bytes) => PacketPayload::from(bytes),
786        WitPayload::Control(value) => {
787            let value: Value = serde_json::from_str(&value).map_err(|err: serde_json::Error| {
788                PureflowError::execution(format!("guest returned invalid control payload: {err}"))
789            })?;
790            PacketPayload::from(value)
791        }
792    };
793
794    Ok(PortPacket::new(packet.metadata, payload))
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800    use pureflow_core::{
801        capability::{EffectCapability, PortCapability, PortCapabilityDirection},
802        context::{CancellationRequest, CancellationToken, ExecutionAttempt, ExecutionMetadata},
803        message::{MessageEndpoint, MessageMetadata, MessageRoute},
804    };
805    use pureflow_types::{ExecutionId, MessageId, NodeId, WorkflowId};
806    use quickcheck::{Arbitrary, Gen, QuickCheck};
807    use serde::Deserialize;
808    use serde_json::json;
809    use std::{
810        collections::BTreeMap,
811        env,
812        ffi::OsString,
813        fs,
814        num::NonZeroU32,
815        path::{Path, PathBuf},
816        process::{Command, Output},
817        sync::OnceLock,
818    };
819
820    const UPPERCASE_FIXTURE_INPUTS_JSON: &str =
821        include_str!("../fixtures/uppercase-guest/testdata/inputs.json");
822    const UPPERCASE_FIXTURE_EXPECTED_OUTPUTS_JSON: &str =
823        include_str!("../fixtures/uppercase-guest/testdata/expected-outputs.json");
824    const UPPERCASE_FIXTURE_MANIFEST: &str = "fixtures/uppercase-guest/Cargo.toml";
825    const UPPERCASE_FIXTURE_ARTIFACT: &str =
826        "wasm32-wasip2/release/pureflow_wasm_uppercase_guest_fixture.wasm";
827    static QUICKCHECK_UPPERCASE_COMPONENT: OnceLock<WasmtimeBatchComponent> = OnceLock::new();
828
829    #[derive(Debug, Deserialize)]
830    struct FixturePortBatch {
831        #[serde(rename = "port-id")]
832        port_id: String,
833        packets: Vec<FixturePacket>,
834    }
835
836    impl FixturePortBatch {
837        fn into_wit(self) -> WitPortBatch {
838            WitPortBatch {
839                port_id: self.port_id,
840                packets: self
841                    .packets
842                    .into_iter()
843                    .map(FixturePacket::into_wit)
844                    .collect(),
845            }
846        }
847    }
848
849    #[derive(Debug, Deserialize)]
850    struct FixturePacket {
851        metadata: FixtureMessageMetadata,
852        payload: FixturePayload,
853    }
854
855    impl FixturePacket {
856        fn into_wit(self) -> WitPacket {
857            WitPacket {
858                metadata: self.metadata.into_message_metadata(),
859                payload: self.payload.into_wit(),
860            }
861        }
862    }
863
864    #[derive(Debug, Deserialize)]
865    struct FixtureMessageMetadata {
866        #[serde(rename = "message-id")]
867        message_id: String,
868        #[serde(rename = "workflow-id")]
869        workflow_id: String,
870        execution: FixtureExecution,
871        route: FixtureRoute,
872    }
873
874    impl FixtureMessageMetadata {
875        fn into_message_metadata(self) -> MessageMetadata {
876            MessageMetadata::new(
877                message_id(&self.message_id),
878                workflow_id(&self.workflow_id),
879                self.execution.into_execution_metadata(),
880                self.route.into_message_route(),
881            )
882        }
883    }
884
885    #[derive(Debug, Deserialize)]
886    struct FixtureExecution {
887        #[serde(rename = "execution-id")]
888        execution_id: String,
889        attempt: u32,
890    }
891
892    impl FixtureExecution {
893        fn into_execution_metadata(self) -> ExecutionMetadata {
894            let attempt: NonZeroU32 =
895                NonZeroU32::new(self.attempt).expect("fixture attempt must be non-zero");
896            ExecutionMetadata::new(
897                execution_id(&self.execution_id),
898                ExecutionAttempt::new(attempt),
899            )
900        }
901    }
902
903    #[derive(Debug, Deserialize)]
904    struct FixtureRoute {
905        source: Option<FixtureEndpoint>,
906        target: FixtureEndpoint,
907    }
908
909    impl FixtureRoute {
910        fn into_message_route(self) -> MessageRoute {
911            MessageRoute::new(
912                self.source.map(FixtureEndpoint::into_message_endpoint),
913                self.target.into_message_endpoint(),
914            )
915        }
916    }
917
918    #[derive(Debug, Deserialize)]
919    struct FixtureEndpoint {
920        #[serde(rename = "node-id")]
921        node_id: String,
922        #[serde(rename = "port-id")]
923        port_id: String,
924    }
925
926    impl FixtureEndpoint {
927        fn into_message_endpoint(self) -> MessageEndpoint {
928            MessageEndpoint::new(node_id(&self.node_id), port_id(&self.port_id))
929        }
930    }
931
932    #[derive(Debug, Deserialize)]
933    #[serde(rename_all = "kebab-case")]
934    enum FixturePayload {
935        Bytes(Vec<u8>),
936        Control(String),
937    }
938
939    impl FixturePayload {
940        fn into_wit(self) -> WitPayload {
941            match self {
942                Self::Bytes(bytes) => WitPayload::Bytes(bytes),
943                Self::Control(value) => WitPayload::Control(value),
944            }
945        }
946    }
947
948    #[derive(Debug, Clone)]
949    struct GeneratedBatch {
950        ports: Vec<GeneratedPortBatch>,
951    }
952
953    impl GeneratedBatch {
954        fn into_wit(self) -> Vec<WitPortBatch> {
955            self.ports
956                .into_iter()
957                .map(GeneratedPortBatch::into_wit)
958                .collect()
959        }
960    }
961
962    impl Arbitrary for GeneratedBatch {
963        fn arbitrary(g: &mut Gen) -> Self {
964            let port_count = usize::arbitrary(g) % 4;
965            let ports = (0..port_count)
966                .map(|port_index| GeneratedPortBatch::arbitrary(g, port_index))
967                .collect();
968
969            Self { ports }
970        }
971    }
972
973    #[derive(Debug, Clone)]
974    struct GeneratedPortBatch {
975        port_id: String,
976        packets: Vec<GeneratedPacket>,
977    }
978
979    impl GeneratedPortBatch {
980        fn arbitrary(g: &mut Gen, port_index: usize) -> Self {
981            let packet_count = usize::arbitrary(g) % 5;
982            let port_id = format!("in{port_index}");
983            let packets = (0..packet_count)
984                .map(|packet_index| GeneratedPacket::arbitrary(g, port_index, packet_index))
985                .collect();
986
987            Self { port_id, packets }
988        }
989
990        fn into_wit(self) -> WitPortBatch {
991            WitPortBatch {
992                port_id: self.port_id,
993                packets: self
994                    .packets
995                    .into_iter()
996                    .map(GeneratedPacket::into_wit)
997                    .collect(),
998            }
999        }
1000    }
1001
1002    #[derive(Debug, Clone)]
1003    struct GeneratedPacket {
1004        metadata: MessageMetadata,
1005        bytes: Vec<u8>,
1006    }
1007
1008    impl GeneratedPacket {
1009        fn arbitrary(g: &mut Gen, port_index: usize, packet_index: usize) -> Self {
1010            let byte_count = usize::arbitrary(g) % 65;
1011            let bytes = (0..byte_count).map(|_| u8::arbitrary(g)).collect();
1012            let source: MessageEndpoint = MessageEndpoint::new(node_id("source"), port_id("out"));
1013            let target: MessageEndpoint =
1014                MessageEndpoint::new(node_id("wasm"), port_id(&format!("in{port_index}")));
1015            let route: MessageRoute = MessageRoute::new(Some(source), target);
1016            let execution: ExecutionMetadata =
1017                ExecutionMetadata::first_attempt(execution_id("run-quickcheck"));
1018            let metadata = MessageMetadata::new(
1019                message_id(&format!("msg-{port_index}-{packet_index}")),
1020                workflow_id("flow-quickcheck"),
1021                execution,
1022                route,
1023            );
1024
1025            Self { metadata, bytes }
1026        }
1027
1028        fn into_wit(self) -> WitPacket {
1029            WitPacket {
1030                metadata: self.metadata,
1031                payload: WitPayload::Bytes(self.bytes),
1032            }
1033        }
1034    }
1035
1036    fn execution_id(value: &str) -> ExecutionId {
1037        ExecutionId::new(value).expect("valid execution id")
1038    }
1039
1040    fn message_id(value: &str) -> MessageId {
1041        MessageId::new(value).expect("valid message id")
1042    }
1043
1044    fn node_id(value: &str) -> NodeId {
1045        NodeId::new(value).expect("valid node id")
1046    }
1047
1048    fn port_id(value: &str) -> PortId {
1049        PortId::new(value).expect("valid port id")
1050    }
1051
1052    fn workflow_id(value: &str) -> WorkflowId {
1053        WorkflowId::new(value).expect("valid workflow id")
1054    }
1055
1056    fn metadata() -> MessageMetadata {
1057        let source: MessageEndpoint = MessageEndpoint::new(node_id("source"), port_id("out"));
1058        let target: MessageEndpoint = MessageEndpoint::new(node_id("wasm"), port_id("in"));
1059        let route: MessageRoute = MessageRoute::new(Some(source), target);
1060        let execution: ExecutionMetadata = ExecutionMetadata::first_attempt(execution_id("run-1"));
1061        MessageMetadata::new(message_id("msg-1"), workflow_id("flow"), execution, route)
1062    }
1063
1064    fn fixture_port_batches_from_json(json: &str) -> Vec<WitPortBatch> {
1065        serde_json::from_str::<Vec<FixturePortBatch>>(json)
1066            .expect("fixture JSON must parse")
1067            .into_iter()
1068            .map(FixturePortBatch::into_wit)
1069            .collect()
1070    }
1071
1072    fn batch_inputs_from_wit_port_batches(port_batches: Vec<WitPortBatch>) -> BatchInputs {
1073        let mut packets_by_port: BTreeMap<PortId, Vec<PortPacket>> = BTreeMap::new();
1074        for port_batch in port_batches {
1075            let port_id: PortId = port_id(&port_batch.port_id);
1076            let packets: Vec<PortPacket> = port_batch
1077                .packets
1078                .into_iter()
1079                .map(|packet: WitPacket| from_wit_packet(packet).expect("fixture packet decodes"))
1080                .collect();
1081            packets_by_port.insert(port_id, packets);
1082        }
1083
1084        BatchInputs::from_packets(packets_by_port)
1085    }
1086
1087    fn uppercase_fixture_outputs(inputs: &[WitPortBatch]) -> Vec<WitPortBatch> {
1088        let mut packets: Vec<WitPacket> = Vec::new();
1089
1090        for port_batch in inputs {
1091            for packet in &port_batch.packets {
1092                let mut packet: WitPacket = packet.clone();
1093                let WitPayload::Bytes(bytes) = packet.payload else {
1094                    panic!("uppercase fixture success vectors must contain only byte payloads");
1095                };
1096                packet.payload = WitPayload::Bytes(
1097                    bytes
1098                        .into_iter()
1099                        .map(|byte: u8| byte.to_ascii_uppercase())
1100                        .collect(),
1101                );
1102                packets.push(packet);
1103            }
1104        }
1105
1106        vec![WitPortBatch {
1107            port_id: "out".to_owned(),
1108            packets,
1109        }]
1110    }
1111
1112    fn build_uppercase_guest_fixture() -> PathBuf {
1113        let crate_dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1114        let manifest_path: PathBuf = crate_dir.join(UPPERCASE_FIXTURE_MANIFEST);
1115        let target_dir: PathBuf = env::temp_dir().join(format!(
1116            "pureflow-wasm-uppercase-guest-fixture-{}",
1117            std::process::id()
1118        ));
1119        let artifact_path: PathBuf = target_dir.join(UPPERCASE_FIXTURE_ARTIFACT);
1120        let cargo: OsString = env::var_os("CARGO").unwrap_or_else(|| OsString::from("cargo"));
1121        let output: Output = Command::new(cargo)
1122            .args([
1123                "build",
1124                "--manifest-path",
1125                path_as_str(&manifest_path),
1126                "--target",
1127                "wasm32-wasip2",
1128                "--release",
1129                "--target-dir",
1130                path_as_str(&target_dir),
1131            ])
1132            .env_remove("RUSTFLAGS")
1133            .output()
1134            .expect("fixture build command should run");
1135
1136        assert!(
1137            output.status.success(),
1138            "fixture build failed\nstdout:\n{}\nstderr:\n{}",
1139            String::from_utf8_lossy(&output.stdout),
1140            String::from_utf8_lossy(&output.stderr),
1141        );
1142        assert!(
1143            artifact_path.is_file(),
1144            "fixture artifact was not written to {}",
1145            artifact_path.display(),
1146        );
1147
1148        artifact_path
1149    }
1150
1151    fn wasm32_wasip2_target_available() -> bool {
1152        let rustc: OsString = env::var_os("RUSTC").unwrap_or_else(|| OsString::from("rustc"));
1153        let Ok(output) = Command::new(rustc)
1154            .args(["--print", "target-libdir", "--target", "wasm32-wasip2"])
1155            .env_remove("RUSTFLAGS")
1156            .output()
1157        else {
1158            return false;
1159        };
1160        if !output.status.success() {
1161            return false;
1162        }
1163        let libdir: PathBuf = PathBuf::from(String::from_utf8_lossy(&output.stdout).trim());
1164        fs::read_dir(libdir).is_ok_and(|entries| {
1165            entries.filter_map(std::result::Result::ok).any(|entry| {
1166                entry.file_name().to_str().is_some_and(|name| {
1167                    name.starts_with("libcore-")
1168                        && Path::new(name)
1169                            .extension()
1170                            .is_some_and(|extension| extension.eq_ignore_ascii_case("rlib"))
1171                })
1172            })
1173        })
1174    }
1175
1176    fn path_as_str(path: &Path) -> &str {
1177        path.to_str().expect("fixture path should be UTF-8")
1178    }
1179
1180    #[test]
1181    fn constants_name_the_wit_abi() {
1182        assert_eq!(WIT_PACKAGE, "pureflow:batch@0.1.0");
1183        assert_eq!(WIT_WORLD, "pureflow-node");
1184    }
1185
1186    #[test]
1187    fn wasm_capabilities_accept_import_free_descriptor() {
1188        let capabilities: NodeCapabilities = NodeCapabilities::native_passive(
1189            node_id("wasm"),
1190            [
1191                PortCapability::new(port_id("in"), PortCapabilityDirection::Receive),
1192                PortCapability::new(port_id("out"), PortCapabilityDirection::Emit),
1193            ],
1194        )
1195        .expect("valid capabilities");
1196
1197        validate_wasm_capabilities(&capabilities).expect("no host imports required");
1198    }
1199
1200    #[test]
1201    fn wasm_capabilities_reject_effects_without_imports() {
1202        let capabilities: NodeCapabilities = NodeCapabilities::new(
1203            node_id("wasm"),
1204            [PortCapability::new(
1205                port_id("in"),
1206                PortCapabilityDirection::Receive,
1207            )],
1208            [EffectCapability::Clock],
1209        )
1210        .expect("valid descriptor shape");
1211
1212        let err: PureflowError =
1213            validate_wasm_capabilities(&capabilities).expect_err("effect must be denied");
1214
1215        assert_eq!(err.code(), pureflow_core::ErrorCode::InvalidCapabilities);
1216        assert!(err.to_string().contains("not enforceable"));
1217    }
1218
1219    #[test]
1220    fn port_batches_round_trip_bytes_and_control_payloads() {
1221        let mut inputs: BatchInputs = BatchInputs::new();
1222        inputs.push(
1223            port_id("in"),
1224            PortPacket::new(
1225                metadata(),
1226                PacketPayload::from(b"bytes".as_slice().to_vec()),
1227            ),
1228        );
1229        inputs.push(
1230            port_id("control"),
1231            PortPacket::new(metadata(), PacketPayload::from(json!({"op": "flush"}))),
1232        );
1233
1234        let wit_batches: Vec<WitPortBatch> =
1235            to_wit_port_batches(&inputs).expect("inputs should encode as WIT batches");
1236        let outputs: BatchOutputs =
1237            from_wit_port_batches(wit_batches).expect("WIT batches should decode");
1238
1239        assert_eq!(outputs.packets(&port_id("in")).len(), 1);
1240        assert_eq!(outputs.packets(&port_id("control")).len(), 1);
1241    }
1242
1243    #[test]
1244    fn invalid_control_payload_is_rejected() {
1245        let packet: WitPacket = WitPacket {
1246            metadata: metadata(),
1247            payload: WitPayload::Control("not-json".to_owned()),
1248        };
1249
1250        let err: PureflowError = from_wit_packet(packet).expect_err("invalid JSON should fail");
1251
1252        assert_eq!(err.code(), pureflow_core::ErrorCode::NodeExecutionFailed);
1253    }
1254
1255    #[test]
1256    fn dynamic_result_value_decodes_outputs() {
1257        let output = WitPortBatch {
1258            port_id: "out".to_owned(),
1259            packets: vec![WitPacket {
1260                metadata: metadata(),
1261                payload: WitPayload::Bytes(b"payload".to_vec()),
1262            }],
1263        };
1264        let result = Val::Result(Ok(Some(Box::new(Val::List(vec![port_batch_to_val(
1265            output,
1266        )])))));
1267
1268        let outputs = batch_outputs_from_result_val(result).expect("result should decode");
1269
1270        assert_eq!(outputs.packets(&port_id("out")).len(), 1);
1271    }
1272
1273    #[test]
1274    fn dynamic_guest_error_maps_to_execution_error() {
1275        let result = Val::Result(Err(Some(Box::new(Val::Variant(
1276            "guest-failure".to_owned(),
1277            Some(Box::new(Val::String("boom".to_owned()))),
1278        )))));
1279
1280        let err = batch_outputs_from_result_val(result).expect_err("guest error should fail");
1281
1282        assert_eq!(err.code(), pureflow_core::ErrorCode::NodeExecutionFailed);
1283    }
1284
1285    #[test]
1286    fn uppercase_guest_fixture_testdata_matches_wit_shape() {
1287        let inputs: Vec<WitPortBatch> =
1288            fixture_port_batches_from_json(UPPERCASE_FIXTURE_INPUTS_JSON);
1289        let expected_outputs: Vec<WitPortBatch> =
1290            fixture_port_batches_from_json(UPPERCASE_FIXTURE_EXPECTED_OUTPUTS_JSON);
1291
1292        assert_eq!(inputs.len(), 1);
1293        assert_eq!(inputs[0].port_id, "in");
1294        assert_eq!(expected_outputs, uppercase_fixture_outputs(&inputs));
1295
1296        let outputs: BatchOutputs =
1297            from_wit_port_batches(expected_outputs).expect("expected fixture outputs must decode");
1298        assert_eq!(outputs.packets(&port_id("out")).len(), 2);
1299    }
1300
1301    #[test]
1302    fn wasmtime_adapter_invokes_real_uppercase_guest_fixture() {
1303        if !wasm32_wasip2_target_available() {
1304            eprintln!(
1305                "skipping real WASM guest conformance test; run through `nix develop .` to provide wasm32-wasip2"
1306            );
1307            return;
1308        }
1309
1310        let fixture_path: PathBuf = build_uppercase_guest_fixture();
1311        let fixture_bytes: Vec<u8> = fs::read(fixture_path).expect("fixture component is readable");
1312        let component: WasmtimeBatchComponent =
1313            WasmtimeBatchComponent::from_component_bytes(fixture_bytes)
1314                .expect("fixture component compiles");
1315        let empty_outputs: BatchOutputs = component
1316            .invoke(&BatchInputs::new())
1317            .expect("fixture guest accepts an empty batch");
1318        assert!(empty_outputs.packets(&port_id("out")).is_empty());
1319
1320        let inputs: Vec<WitPortBatch> =
1321            fixture_port_batches_from_json(UPPERCASE_FIXTURE_INPUTS_JSON);
1322        let expected_outputs: BatchOutputs = from_wit_port_batches(fixture_port_batches_from_json(
1323            UPPERCASE_FIXTURE_EXPECTED_OUTPUTS_JSON,
1324        ))
1325        .expect("expected fixture outputs decode");
1326
1327        let actual_outputs: BatchOutputs = component
1328            .invoke(&batch_inputs_from_wit_port_batches(inputs))
1329            .expect("fixture guest invocation succeeds");
1330
1331        assert_eq!(actual_outputs, expected_outputs);
1332    }
1333
1334    #[test]
1335    fn wasmtime_adapter_rejects_pre_cancelled_invocation() {
1336        if !wasm32_wasip2_target_available() {
1337            eprintln!(
1338                "skipping WASM cancellation test; run through `nix develop .` to provide wasm32-wasip2"
1339            );
1340            return;
1341        }
1342
1343        let fixture_path: PathBuf = build_uppercase_guest_fixture();
1344        let fixture_bytes: Vec<u8> = fs::read(fixture_path).expect("fixture component is readable");
1345        let component: WasmtimeBatchComponent =
1346            WasmtimeBatchComponent::from_component_bytes(fixture_bytes)
1347                .expect("fixture component compiles");
1348        let cancellation: CancellationToken =
1349            CancellationToken::cancelled(CancellationRequest::new("test shutdown"));
1350
1351        let err: PureflowError = component
1352            .invoke_with_cancellation(&BatchInputs::new(), &cancellation)
1353            .expect_err("pre-cancelled invocation must fail before guest execution");
1354
1355        assert_eq!(err.code(), pureflow_core::ErrorCode::ExecutionCancelled);
1356        assert!(err.to_string().contains("test shutdown"));
1357    }
1358
1359    #[test]
1360    fn wasmtime_adapter_reports_stable_fuel_limit_error() {
1361        if !wasm32_wasip2_target_available() {
1362            eprintln!(
1363                "skipping WASM fuel limit test; run through `nix develop .` to provide wasm32-wasip2"
1364            );
1365            return;
1366        }
1367
1368        let fixture_path: PathBuf = build_uppercase_guest_fixture();
1369        let fixture_bytes: Vec<u8> = fs::read(fixture_path).expect("fixture component is readable");
1370        let component: WasmtimeBatchComponent =
1371            WasmtimeBatchComponent::from_component_bytes_with_limits(
1372                fixture_bytes,
1373                WasmtimeExecutionLimits::new(0),
1374            )
1375            .expect("fixture component compiles");
1376        let inputs: Vec<WitPortBatch> =
1377            fixture_port_batches_from_json(UPPERCASE_FIXTURE_INPUTS_JSON);
1378
1379        let err: PureflowError = component
1380            .invoke(&batch_inputs_from_wit_port_batches(inputs))
1381            .expect_err("zero fuel should trap with a stable host error");
1382
1383        assert_eq!(err.code(), pureflow_core::ErrorCode::NodeExecutionFailed);
1384        assert!(
1385            err.to_string()
1386                .contains("guest exceeded Wasmtime fuel limit of 0 units"),
1387            "unexpected fuel error: {err}"
1388        );
1389    }
1390
1391    #[test]
1392    fn wasmtime_adapter_preserves_generated_byte_batches_across_component_boundary() {
1393        if !wasm32_wasip2_target_available() {
1394            eprintln!(
1395                "skipping generated WASM boundary conformance test; run through `nix develop .` to provide wasm32-wasip2"
1396            );
1397            return;
1398        }
1399
1400        let fixture_path: PathBuf = build_uppercase_guest_fixture();
1401        let fixture_bytes: Vec<u8> = fs::read(fixture_path).expect("fixture component is readable");
1402        let component: WasmtimeBatchComponent =
1403            WasmtimeBatchComponent::from_component_bytes(fixture_bytes)
1404                .expect("fixture component compiles");
1405        let _ = QUICKCHECK_UPPERCASE_COMPONENT.set(component);
1406
1407        QuickCheck::new()
1408            .tests(32)
1409            .quickcheck(generated_byte_batch_boundary_holds as fn(GeneratedBatch) -> bool);
1410    }
1411
1412    fn generated_byte_batch_boundary_holds(generated: GeneratedBatch) -> bool {
1413        let component = QUICKCHECK_UPPERCASE_COMPONENT
1414            .get()
1415            .expect("quickcheck component initialized");
1416        let inputs: Vec<WitPortBatch> = generated.into_wit();
1417        let expected_outputs: BatchOutputs =
1418            from_wit_port_batches(uppercase_fixture_outputs(&inputs))
1419                .expect("generated expected outputs decode");
1420        let actual_outputs: BatchOutputs = component
1421            .invoke(&batch_inputs_from_wit_port_batches(inputs))
1422            .expect("generated fixture guest invocation succeeds");
1423
1424        actual_outputs == expected_outputs
1425    }
1426}