1use std::{
9 num::NonZeroU32,
10 sync::{
11 Arc,
12 atomic::{AtomicBool, Ordering},
13 },
14 thread::{self, JoinHandle},
15 time::Duration,
16};
17
18use pureflow_core::{
19 BatchExecutor, BatchInputs, BatchOutputs, PureflowError, PacketPayload, PortPacket, Result,
20 capability::{CapabilityValidationError, NodeCapabilities},
21 context::{CancellationRequest, CancellationToken, ExecutionAttempt, ExecutionMetadata},
22 message::{MessageEndpoint, MessageMetadata, MessageRoute},
23};
24use pureflow_types::{ExecutionId, MessageId, NodeId, PortId, WorkflowId};
25use serde_json::Value;
26use wasmtime::{
27 Config, Engine, Store,
28 component::{Component, ComponentExportIndex, Func, Instance, Linker, Val},
29};
30
31pub const WIT_PACKAGE: &str = "pureflow:batch@0.1.0";
33
34pub const WIT_WORLD: &str = "pureflow-node";
36
37const DEFAULT_GUEST_FUEL: u64 = 100_000_000;
38const DEFAULT_CANCELLATION_EPOCH_DEADLINE: u64 = 1;
39const DEFAULT_CANCELLATION_POLL_INTERVAL: Duration = Duration::from_millis(1);
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub struct WasmtimeExecutionLimits {
44 fuel: u64,
45 cancellation_epoch_deadline: u64,
46 cancellation_poll_interval: Duration,
47}
48
49impl WasmtimeExecutionLimits {
50 #[must_use]
52 pub const fn new(fuel: u64) -> Self {
53 Self {
54 fuel,
55 cancellation_epoch_deadline: DEFAULT_CANCELLATION_EPOCH_DEADLINE,
56 cancellation_poll_interval: DEFAULT_CANCELLATION_POLL_INTERVAL,
57 }
58 }
59
60 #[must_use]
62 pub const fn fuel(&self) -> u64 {
63 self.fuel
64 }
65
66 #[must_use]
68 pub const fn cancellation_epoch_deadline(&self) -> u64 {
69 self.cancellation_epoch_deadline
70 }
71
72 #[must_use]
74 pub const fn cancellation_poll_interval(&self) -> Duration {
75 self.cancellation_poll_interval
76 }
77
78 #[must_use]
80 pub const fn with_cancellation_epoch_deadline(mut self, ticks: u64) -> Self {
81 self.cancellation_epoch_deadline = ticks;
82 self
83 }
84
85 #[must_use]
87 pub const fn with_cancellation_poll_interval(mut self, interval: Duration) -> Self {
88 self.cancellation_poll_interval = interval;
89 self
90 }
91}
92
93impl Default for WasmtimeExecutionLimits {
94 fn default() -> Self {
95 Self::new(DEFAULT_GUEST_FUEL)
96 }
97}
98
99pub struct WasmtimeBatchComponent {
101 engine: Engine,
102 component: Component,
103 limits: WasmtimeExecutionLimits,
104}
105
106impl WasmtimeBatchComponent {
107 pub fn from_component_bytes(bytes: impl AsRef<[u8]>) -> Result<Self> {
114 Self::from_component_bytes_with_limits(bytes, WasmtimeExecutionLimits::default())
115 }
116
117 pub fn from_component_bytes_with_limits(
124 bytes: impl AsRef<[u8]>,
125 limits: WasmtimeExecutionLimits,
126 ) -> Result<Self> {
127 let engine: Engine = component_engine()?;
128 let component: Component =
129 Component::from_binary(&engine, bytes.as_ref()).map_err(|err: wasmtime::Error| {
130 PureflowError::execution(format!("failed to compile component: {err}"))
131 })?;
132
133 Ok(Self {
134 engine,
135 component,
136 limits,
137 })
138 }
139
140 pub fn from_component_bytes_with_capabilities(
148 bytes: impl AsRef<[u8]>,
149 capabilities: &NodeCapabilities,
150 ) -> Result<Self> {
151 Self::from_component_bytes_with_capabilities_and_limits(
152 bytes,
153 capabilities,
154 WasmtimeExecutionLimits::default(),
155 )
156 }
157
158 pub fn from_component_bytes_with_capabilities_and_limits(
167 bytes: impl AsRef<[u8]>,
168 capabilities: &NodeCapabilities,
169 limits: WasmtimeExecutionLimits,
170 ) -> Result<Self> {
171 validate_wasm_capabilities(capabilities)?;
172 Self::from_component_bytes_with_limits(bytes, limits)
173 }
174
175 #[must_use]
177 pub const fn limits(&self) -> WasmtimeExecutionLimits {
178 self.limits
179 }
180
181 pub fn invoke(&self, inputs: &BatchInputs) -> Result<BatchOutputs> {
188 self.invoke_with_cancellation(inputs, &CancellationToken::active())
189 }
190
191 pub fn invoke_with_cancellation(
201 &self,
202 inputs: &BatchInputs,
203 cancellation: &CancellationToken,
204 ) -> Result<BatchOutputs> {
205 if let Some(request) = cancellation.request() {
206 return Err(PureflowError::cancelled(request.reason()));
207 }
208
209 let linker: Linker<()> = Linker::new(&self.engine);
210 let mut store: Store<()> = Store::new(&self.engine, ());
211 store.set_epoch_deadline(self.limits.cancellation_epoch_deadline());
212 store
213 .set_fuel(self.limits.fuel())
214 .map_err(|err: wasmtime::Error| {
215 PureflowError::execution(format!("failed to configure guest fuel: {err}"))
216 })?;
217 let watcher: CancellationWatcher = CancellationWatcher::spawn(
218 self.engine.clone(),
219 cancellation.clone(),
220 self.limits.cancellation_poll_interval(),
221 )?;
222 let instance: Instance =
223 linker
224 .instantiate(&mut store, &self.component)
225 .map_err(|err: wasmtime::Error| {
226 PureflowError::execution(format!("failed to instantiate component: {err}"))
227 })?;
228 let batch_index: ComponentExportIndex = instance
229 .get_export_index(&mut store, None, "pureflow:batch/batch@0.1.0")
230 .ok_or_else(|| {
231 PureflowError::execution("component does not export pureflow:batch/batch@0.1.0")
232 })?;
233 let invoke_index: ComponentExportIndex = instance
234 .get_export_index(&mut store, Some(&batch_index), "invoke")
235 .ok_or_else(|| PureflowError::execution("component does not export batch.invoke"))?;
236 let invoke: Func = instance
237 .get_func(&mut store, invoke_index)
238 .ok_or_else(|| PureflowError::execution("batch.invoke export is not a function"))?;
239
240 let params: [Val; 1] = [batch_inputs_to_val(inputs)?];
241 let mut results: [Val; 1] = [Val::Bool(false)];
242 let call_result: std::result::Result<(), wasmtime::Error> =
243 invoke.call(&mut store, ¶ms, &mut results);
244 let interrupted: bool = watcher.finish();
245 if interrupted {
246 let reason: String = cancellation.request().map_or_else(
247 || String::from("wasm guest invocation cancelled"),
248 |request: CancellationRequest| request.reason().to_owned(),
249 );
250 return Err(PureflowError::cancelled(reason));
251 }
252 let remaining_fuel: Option<u64> = store.get_fuel().ok();
253 call_result.map_err(|err: wasmtime::Error| {
254 map_guest_call_error(&err, self.limits, remaining_fuel)
255 })?;
256
257 let [result]: [Val; 1] = results;
258 batch_outputs_from_result_val(result)
259 }
260}
261
262impl BatchExecutor for WasmtimeBatchComponent {
263 fn invoke(&self, inputs: BatchInputs) -> Result<BatchOutputs> {
264 Self::invoke(self, &inputs)
265 }
266}
267
268pub fn validate_wasm_capabilities(capabilities: &NodeCapabilities) -> Result<()> {
274 if let Some(effect) = capabilities.effects().first() {
275 return Err(CapabilityValidationError::UnenforceableEffectCapability {
276 node_id: capabilities.node_id().clone(),
277 effect: *effect,
278 }
279 .into());
280 }
281
282 Ok(())
283}
284
285pub fn to_wit_port_batches(inputs: &BatchInputs) -> Result<Vec<WitPortBatch>> {
291 inputs
292 .packets_by_port()
293 .iter()
294 .map(|(port_id, packets): (&PortId, &Vec<PortPacket>)| {
295 Ok(WitPortBatch {
296 port_id: port_id.to_string(),
297 packets: packets
298 .iter()
299 .map(to_wit_packet)
300 .collect::<Result<Vec<_>>>()?,
301 })
302 })
303 .collect()
304}
305
306pub fn from_wit_port_batches(port_batches: Vec<WitPortBatch>) -> Result<BatchOutputs> {
313 let mut outputs: BatchOutputs = BatchOutputs::new();
314 for port_batch in port_batches {
315 let port_id: PortId = PortId::new(port_batch.port_id)?;
316 for packet in port_batch.packets {
317 outputs.push(port_id.clone(), from_wit_packet(packet)?);
318 }
319 }
320
321 Ok(outputs)
322}
323
324#[derive(Debug, Clone, PartialEq, Eq)]
326pub struct WitPortBatch {
327 pub port_id: String,
329 pub packets: Vec<WitPacket>,
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
335pub struct WitPacket {
336 pub metadata: pureflow_core::message::MessageMetadata,
338 pub payload: WitPayload,
340}
341
342#[derive(Debug, Clone, PartialEq, Eq)]
344pub enum WitPayload {
345 Bytes(Vec<u8>),
347 Control(String),
349}
350
351fn component_engine() -> Result<Engine> {
352 let mut config: Config = Config::new();
353 config.wasm_component_model(true);
354 config.epoch_interruption(true);
355 config.consume_fuel(true);
356 Engine::new(&config).map_err(|err: wasmtime::Error| {
357 PureflowError::execution(format!("failed to create Wasmtime engine: {err}"))
358 })
359}
360
361struct CancellationWatcher {
362 complete: Arc<AtomicBool>,
363 interrupted: Arc<AtomicBool>,
364 thread: Option<JoinHandle<()>>,
365}
366
367impl CancellationWatcher {
368 fn spawn(engine: Engine, cancellation: CancellationToken, interval: Duration) -> Result<Self> {
369 let complete: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
370 let interrupted: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
371 let thread_complete: Arc<AtomicBool> = Arc::clone(&complete);
372 let thread_interrupted: Arc<AtomicBool> = Arc::clone(&interrupted);
373 let thread: JoinHandle<()> = thread::Builder::new()
374 .name(String::from("pureflow-wasm-cancellation"))
375 .spawn(move || {
376 while !thread_complete.load(Ordering::Acquire) {
377 if cancellation.is_cancelled() {
378 thread_interrupted.store(true, Ordering::Release);
379 engine.increment_epoch();
380 break;
381 }
382 thread::sleep(interval);
383 }
384 })
385 .map_err(|err: std::io::Error| {
386 PureflowError::execution(format!("failed to start WASM cancellation watcher: {err}"))
387 })?;
388
389 Ok(Self {
390 complete,
391 interrupted,
392 thread: Some(thread),
393 })
394 }
395
396 fn finish(mut self) -> bool {
397 self.complete.store(true, Ordering::Release);
398 if let Some(thread) = self.thread.take() {
399 let _ = thread.join();
400 }
401 self.interrupted.load(Ordering::Acquire)
402 }
403}
404
405impl Drop for CancellationWatcher {
406 fn drop(&mut self) {
407 self.complete.store(true, Ordering::Release);
408 if let Some(thread) = self.thread.take() {
409 let _ = thread.join();
410 }
411 }
412}
413
414fn map_guest_call_error(
415 err: &wasmtime::Error,
416 limits: WasmtimeExecutionLimits,
417 remaining_fuel: Option<u64>,
418) -> PureflowError {
419 let message: String = err.to_string();
420 if remaining_fuel == Some(0) || message.to_ascii_lowercase().contains("fuel") {
421 PureflowError::execution(format!(
422 "guest exceeded Wasmtime fuel limit of {} units",
423 limits.fuel()
424 ))
425 } else {
426 PureflowError::execution(format!("guest invoke failed: {err}"))
427 }
428}
429
430fn batch_inputs_to_val(inputs: &BatchInputs) -> Result<Val> {
431 Ok(Val::List(
432 to_wit_port_batches(inputs)?
433 .into_iter()
434 .map(port_batch_to_val)
435 .collect(),
436 ))
437}
438
439fn port_batch_to_val(port_batch: WitPortBatch) -> Val {
440 Val::Record(vec![
441 ("port-id".to_owned(), Val::String(port_batch.port_id)),
442 (
443 "packets".to_owned(),
444 Val::List(port_batch.packets.into_iter().map(packet_to_val).collect()),
445 ),
446 ])
447}
448
449fn packet_to_val(packet: WitPacket) -> Val {
450 Val::Record(vec![
451 ("metadata".to_owned(), metadata_to_val(&packet.metadata)),
452 ("payload".to_owned(), payload_to_val(packet.payload)),
453 ])
454}
455
456fn metadata_to_val(metadata: &MessageMetadata) -> Val {
457 Val::Record(vec![
458 (
459 "message-id".to_owned(),
460 Val::String(metadata.message_id().to_string()),
461 ),
462 (
463 "workflow-id".to_owned(),
464 Val::String(metadata.workflow_id().to_string()),
465 ),
466 (
467 "execution".to_owned(),
468 Val::Record(vec![
469 (
470 "execution-id".to_owned(),
471 Val::String(metadata.execution().execution_id().to_string()),
472 ),
473 (
474 "attempt".to_owned(),
475 Val::U32(metadata.execution().attempt().get()),
476 ),
477 ]),
478 ),
479 ("route".to_owned(), route_to_val(metadata.route())),
480 ])
481}
482
483fn route_to_val(route: &MessageRoute) -> Val {
484 Val::Record(vec![
485 (
486 "source".to_owned(),
487 Val::Option(
488 route
489 .source()
490 .map(|source: &MessageEndpoint| Box::new(endpoint_to_val(source))),
491 ),
492 ),
493 ("target".to_owned(), endpoint_to_val(route.target())),
494 ])
495}
496
497fn endpoint_to_val(endpoint: &MessageEndpoint) -> Val {
498 Val::Record(vec![
499 (
500 "node-id".to_owned(),
501 Val::String(endpoint.node_id().to_string()),
502 ),
503 (
504 "port-id".to_owned(),
505 Val::String(endpoint.port_id().to_string()),
506 ),
507 ])
508}
509
510fn payload_to_val(payload: WitPayload) -> Val {
511 match payload {
512 WitPayload::Bytes(bytes) => {
513 Val::Variant("bytes".to_owned(), Some(Box::new(bytes_to_list_val(bytes))))
514 }
515 WitPayload::Control(value) => {
516 Val::Variant("control".to_owned(), Some(Box::new(Val::String(value))))
517 }
518 }
519}
520
521fn bytes_to_list_val(bytes: Vec<u8>) -> Val {
522 Val::List(bytes.into_iter().map(Val::U8).collect())
523}
524
525fn batch_outputs_from_result_val(value: Val) -> Result<BatchOutputs> {
526 let result: std::result::Result<Option<Box<Val>>, Option<Box<Val>>> = match value {
527 Val::Result(result) => result,
528 _ => {
529 return Err(PureflowError::execution(
530 "guest returned non-result from batch.invoke",
531 ));
532 }
533 };
534
535 match result {
536 Ok(Some(value)) => port_batches_from_val(*value).and_then(from_wit_port_batches),
537 Ok(None) => Err(PureflowError::execution(
538 "guest returned empty ok result from batch.invoke",
539 )),
540 Err(Some(value)) => Err(batch_error_from_val(*value)),
541 Err(None) => Err(PureflowError::execution(
542 "guest returned empty error from batch.invoke",
543 )),
544 }
545}
546
547fn batch_error_from_val(value: Val) -> PureflowError {
548 match value {
549 Val::Variant(name, Some(detail)) => match *detail {
550 Val::String(message) => {
551 PureflowError::execution(format!("guest returned {name}: {message}"))
552 }
553 _ => PureflowError::execution(format!("guest returned malformed {name} error")),
554 },
555 Val::Variant(name, None) => {
556 PureflowError::execution(format!("guest returned {name} without detail"))
557 }
558 _ => PureflowError::execution("guest returned malformed batch error"),
559 }
560}
561
562fn port_batches_from_val(value: Val) -> Result<Vec<WitPortBatch>> {
563 let values: Vec<Val> = match value {
564 Val::List(values) => values,
565 _ => {
566 return Err(PureflowError::execution(
567 "guest returned non-list batch output",
568 ));
569 }
570 };
571
572 values.into_iter().map(port_batch_from_val).collect()
573}
574
575fn port_batch_from_val(value: Val) -> Result<WitPortBatch> {
576 let fields: Vec<(String, Val)> = record_fields(value, "port batch")?;
577 let port_id: String = required_string_field(&fields, "port-id", "port batch")?;
578 let packets: Vec<WitPacket> = required_list_field(&fields, "packets", "port batch")?
579 .into_iter()
580 .map(packet_from_val)
581 .collect::<Result<Vec<_>>>()?;
582
583 Ok(WitPortBatch { port_id, packets })
584}
585
586fn packet_from_val(value: Val) -> Result<WitPacket> {
587 let fields: Vec<(String, Val)> = record_fields(value, "packet")?;
588 let metadata: MessageMetadata =
589 metadata_from_val(required_field(&fields, "metadata", "packet")?.clone())?;
590 let payload: WitPayload =
591 payload_from_val(required_field(&fields, "payload", "packet")?.clone())?;
592
593 Ok(WitPacket { metadata, payload })
594}
595
596fn metadata_from_val(value: Val) -> Result<MessageMetadata> {
597 let fields: Vec<(String, Val)> = record_fields(value, "message metadata")?;
598 let message_id: MessageId = MessageId::new(required_string_field(
599 &fields,
600 "message-id",
601 "message metadata",
602 )?)?;
603 let workflow_id: WorkflowId = WorkflowId::new(required_string_field(
604 &fields,
605 "workflow-id",
606 "message metadata",
607 )?)?;
608 let execution: ExecutionMetadata =
609 execution_from_val(required_field(&fields, "execution", "message metadata")?.clone())?;
610 let route: MessageRoute =
611 route_from_val(required_field(&fields, "route", "message metadata")?.clone())?;
612
613 Ok(MessageMetadata::new(
614 message_id,
615 workflow_id,
616 execution,
617 route,
618 ))
619}
620
621fn execution_from_val(value: Val) -> Result<ExecutionMetadata> {
622 let fields: Vec<(String, Val)> = record_fields(value, "execution metadata")?;
623 let execution_id: ExecutionId = ExecutionId::new(required_string_field(
624 &fields,
625 "execution-id",
626 "execution metadata",
627 )?)?;
628 let attempt: u32 = required_u32_field(&fields, "attempt", "execution metadata")?;
629 let attempt: ExecutionAttempt = NonZeroU32::new(attempt)
630 .map(ExecutionAttempt::new)
631 .ok_or_else(|| PureflowError::execution("guest returned zero execution attempt"))?;
632
633 Ok(ExecutionMetadata::new(execution_id, attempt))
634}
635
636fn route_from_val(value: Val) -> Result<MessageRoute> {
637 let fields: Vec<(String, Val)> = record_fields(value, "message route")?;
638 let source: Option<MessageEndpoint> = match required_field(&fields, "source", "message route")?
639 {
640 Val::Option(Some(source)) => Some(endpoint_from_val(source.as_ref().clone())?),
641 Val::Option(None) => None,
642 _ => {
643 return Err(PureflowError::execution(
644 "guest returned non-option route source",
645 ));
646 }
647 };
648 let target: MessageEndpoint =
649 endpoint_from_val(required_field(&fields, "target", "message route")?.clone())?;
650
651 Ok(MessageRoute::new(source, target))
652}
653
654fn endpoint_from_val(value: Val) -> Result<MessageEndpoint> {
655 let fields: Vec<(String, Val)> = record_fields(value, "message endpoint")?;
656 let node_id: NodeId = NodeId::new(required_string_field(
657 &fields,
658 "node-id",
659 "message endpoint",
660 )?)?;
661 let port_id: PortId = PortId::new(required_string_field(
662 &fields,
663 "port-id",
664 "message endpoint",
665 )?)?;
666
667 Ok(MessageEndpoint::new(node_id, port_id))
668}
669
670fn payload_from_val(value: Val) -> Result<WitPayload> {
671 let (name, payload): (String, Option<Box<Val>>) = match value {
672 Val::Variant(name, payload) => (name, payload),
673 _ => {
674 return Err(PureflowError::execution(
675 "guest returned non-variant payload",
676 ));
677 }
678 };
679 match (name.as_str(), payload) {
680 ("bytes", Some(value)) => Ok(WitPayload::Bytes(bytes_from_val(*value)?)),
681 ("control", Some(value)) => {
682 let value: String = match *value {
683 Val::String(value) => value,
684 _ => {
685 return Err(PureflowError::execution(
686 "guest returned non-string control payload",
687 ));
688 }
689 };
690 Ok(WitPayload::Control(value))
691 }
692 (kind, _) => Err(PureflowError::execution(format!(
693 "guest returned unsupported payload variant: {kind}"
694 ))),
695 }
696}
697
698fn bytes_from_val(value: Val) -> Result<Vec<u8>> {
699 let values: Vec<Val> = match value {
700 Val::List(values) => values,
701 _ => {
702 return Err(PureflowError::execution(
703 "guest returned non-list bytes payload",
704 ));
705 }
706 };
707 values
708 .into_iter()
709 .map(|value: Val| match value {
710 Val::U8(byte) => Ok(byte),
711 _ => Err(PureflowError::execution(
712 "guest returned non-u8 byte payload element",
713 )),
714 })
715 .collect()
716}
717
718fn record_fields(value: Val, context: &str) -> Result<Vec<(String, Val)>> {
719 let fields: Vec<(String, Val)> = match value {
720 Val::Record(fields) => fields,
721 _ => {
722 return Err(PureflowError::execution(format!(
723 "guest returned non-record {context}"
724 )));
725 }
726 };
727 Ok(fields)
728}
729
730fn required_field<'a>(fields: &'a [(String, Val)], name: &str, context: &str) -> Result<&'a Val> {
731 fields
732 .iter()
733 .find_map(|(field_name, value): &(String, Val)| (field_name == name).then_some(value))
734 .ok_or_else(|| PureflowError::execution(format!("guest omitted {context} field {name}")))
735}
736
737fn required_string_field(fields: &[(String, Val)], name: &str, context: &str) -> Result<String> {
738 match required_field(fields, name, context)? {
739 Val::String(value) => Ok(value.clone()),
740 _ => Err(PureflowError::execution(format!(
741 "guest returned non-string {context} field {name}"
742 ))),
743 }
744}
745
746fn required_u32_field(fields: &[(String, Val)], name: &str, context: &str) -> Result<u32> {
747 match required_field(fields, name, context)? {
748 Val::U32(value) => Ok(*value),
749 _ => Err(PureflowError::execution(format!(
750 "guest returned non-u32 {context} field {name}"
751 ))),
752 }
753}
754
755fn required_list_field(fields: &[(String, Val)], name: &str, context: &str) -> Result<Vec<Val>> {
756 match required_field(fields, name, context)? {
757 Val::List(values) => Ok(values.clone()),
758 _ => Err(PureflowError::execution(format!(
759 "guest returned non-list {context} field {name}"
760 ))),
761 }
762}
763
764#[allow(clippy::match_wildcard_for_single_variants)]
765fn to_wit_packet(packet: &PortPacket) -> Result<WitPacket> {
766 let payload: WitPayload = match packet.payload() {
767 PacketPayload::Bytes(bytes) => WitPayload::Bytes(bytes.to_vec()),
768 PacketPayload::Control(value) => WitPayload::Control(value.to_string()),
769 #[allow(unreachable_patterns)]
770 _ => {
771 return Err(PureflowError::execution(
772 "payload is not supported by WIT ABI 0.1.0",
773 ));
774 }
775 };
776
777 Ok(WitPacket {
778 metadata: packet.metadata().clone(),
779 payload,
780 })
781}
782
783fn from_wit_packet(packet: WitPacket) -> Result<PortPacket> {
784 let payload: PacketPayload = match packet.payload {
785 WitPayload::Bytes(bytes) => PacketPayload::from(bytes),
786 WitPayload::Control(value) => {
787 let value: Value = serde_json::from_str(&value).map_err(|err: serde_json::Error| {
788 PureflowError::execution(format!("guest returned invalid control payload: {err}"))
789 })?;
790 PacketPayload::from(value)
791 }
792 };
793
794 Ok(PortPacket::new(packet.metadata, payload))
795}
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800 use pureflow_core::{
801 capability::{EffectCapability, PortCapability, PortCapabilityDirection},
802 context::{CancellationRequest, CancellationToken, ExecutionAttempt, ExecutionMetadata},
803 message::{MessageEndpoint, MessageMetadata, MessageRoute},
804 };
805 use pureflow_types::{ExecutionId, MessageId, NodeId, WorkflowId};
806 use quickcheck::{Arbitrary, Gen, QuickCheck};
807 use serde::Deserialize;
808 use serde_json::json;
809 use std::{
810 collections::BTreeMap,
811 env,
812 ffi::OsString,
813 fs,
814 num::NonZeroU32,
815 path::{Path, PathBuf},
816 process::{Command, Output},
817 sync::OnceLock,
818 };
819
820 const UPPERCASE_FIXTURE_INPUTS_JSON: &str =
821 include_str!("../fixtures/uppercase-guest/testdata/inputs.json");
822 const UPPERCASE_FIXTURE_EXPECTED_OUTPUTS_JSON: &str =
823 include_str!("../fixtures/uppercase-guest/testdata/expected-outputs.json");
824 const UPPERCASE_FIXTURE_MANIFEST: &str = "fixtures/uppercase-guest/Cargo.toml";
825 const UPPERCASE_FIXTURE_ARTIFACT: &str =
826 "wasm32-wasip2/release/pureflow_wasm_uppercase_guest_fixture.wasm";
827 static QUICKCHECK_UPPERCASE_COMPONENT: OnceLock<WasmtimeBatchComponent> = OnceLock::new();
828
829 #[derive(Debug, Deserialize)]
830 struct FixturePortBatch {
831 #[serde(rename = "port-id")]
832 port_id: String,
833 packets: Vec<FixturePacket>,
834 }
835
836 impl FixturePortBatch {
837 fn into_wit(self) -> WitPortBatch {
838 WitPortBatch {
839 port_id: self.port_id,
840 packets: self
841 .packets
842 .into_iter()
843 .map(FixturePacket::into_wit)
844 .collect(),
845 }
846 }
847 }
848
849 #[derive(Debug, Deserialize)]
850 struct FixturePacket {
851 metadata: FixtureMessageMetadata,
852 payload: FixturePayload,
853 }
854
855 impl FixturePacket {
856 fn into_wit(self) -> WitPacket {
857 WitPacket {
858 metadata: self.metadata.into_message_metadata(),
859 payload: self.payload.into_wit(),
860 }
861 }
862 }
863
864 #[derive(Debug, Deserialize)]
865 struct FixtureMessageMetadata {
866 #[serde(rename = "message-id")]
867 message_id: String,
868 #[serde(rename = "workflow-id")]
869 workflow_id: String,
870 execution: FixtureExecution,
871 route: FixtureRoute,
872 }
873
874 impl FixtureMessageMetadata {
875 fn into_message_metadata(self) -> MessageMetadata {
876 MessageMetadata::new(
877 message_id(&self.message_id),
878 workflow_id(&self.workflow_id),
879 self.execution.into_execution_metadata(),
880 self.route.into_message_route(),
881 )
882 }
883 }
884
885 #[derive(Debug, Deserialize)]
886 struct FixtureExecution {
887 #[serde(rename = "execution-id")]
888 execution_id: String,
889 attempt: u32,
890 }
891
892 impl FixtureExecution {
893 fn into_execution_metadata(self) -> ExecutionMetadata {
894 let attempt: NonZeroU32 =
895 NonZeroU32::new(self.attempt).expect("fixture attempt must be non-zero");
896 ExecutionMetadata::new(
897 execution_id(&self.execution_id),
898 ExecutionAttempt::new(attempt),
899 )
900 }
901 }
902
903 #[derive(Debug, Deserialize)]
904 struct FixtureRoute {
905 source: Option<FixtureEndpoint>,
906 target: FixtureEndpoint,
907 }
908
909 impl FixtureRoute {
910 fn into_message_route(self) -> MessageRoute {
911 MessageRoute::new(
912 self.source.map(FixtureEndpoint::into_message_endpoint),
913 self.target.into_message_endpoint(),
914 )
915 }
916 }
917
918 #[derive(Debug, Deserialize)]
919 struct FixtureEndpoint {
920 #[serde(rename = "node-id")]
921 node_id: String,
922 #[serde(rename = "port-id")]
923 port_id: String,
924 }
925
926 impl FixtureEndpoint {
927 fn into_message_endpoint(self) -> MessageEndpoint {
928 MessageEndpoint::new(node_id(&self.node_id), port_id(&self.port_id))
929 }
930 }
931
932 #[derive(Debug, Deserialize)]
933 #[serde(rename_all = "kebab-case")]
934 enum FixturePayload {
935 Bytes(Vec<u8>),
936 Control(String),
937 }
938
939 impl FixturePayload {
940 fn into_wit(self) -> WitPayload {
941 match self {
942 Self::Bytes(bytes) => WitPayload::Bytes(bytes),
943 Self::Control(value) => WitPayload::Control(value),
944 }
945 }
946 }
947
948 #[derive(Debug, Clone)]
949 struct GeneratedBatch {
950 ports: Vec<GeneratedPortBatch>,
951 }
952
953 impl GeneratedBatch {
954 fn into_wit(self) -> Vec<WitPortBatch> {
955 self.ports
956 .into_iter()
957 .map(GeneratedPortBatch::into_wit)
958 .collect()
959 }
960 }
961
962 impl Arbitrary for GeneratedBatch {
963 fn arbitrary(g: &mut Gen) -> Self {
964 let port_count = usize::arbitrary(g) % 4;
965 let ports = (0..port_count)
966 .map(|port_index| GeneratedPortBatch::arbitrary(g, port_index))
967 .collect();
968
969 Self { ports }
970 }
971 }
972
973 #[derive(Debug, Clone)]
974 struct GeneratedPortBatch {
975 port_id: String,
976 packets: Vec<GeneratedPacket>,
977 }
978
979 impl GeneratedPortBatch {
980 fn arbitrary(g: &mut Gen, port_index: usize) -> Self {
981 let packet_count = usize::arbitrary(g) % 5;
982 let port_id = format!("in{port_index}");
983 let packets = (0..packet_count)
984 .map(|packet_index| GeneratedPacket::arbitrary(g, port_index, packet_index))
985 .collect();
986
987 Self { port_id, packets }
988 }
989
990 fn into_wit(self) -> WitPortBatch {
991 WitPortBatch {
992 port_id: self.port_id,
993 packets: self
994 .packets
995 .into_iter()
996 .map(GeneratedPacket::into_wit)
997 .collect(),
998 }
999 }
1000 }
1001
1002 #[derive(Debug, Clone)]
1003 struct GeneratedPacket {
1004 metadata: MessageMetadata,
1005 bytes: Vec<u8>,
1006 }
1007
1008 impl GeneratedPacket {
1009 fn arbitrary(g: &mut Gen, port_index: usize, packet_index: usize) -> Self {
1010 let byte_count = usize::arbitrary(g) % 65;
1011 let bytes = (0..byte_count).map(|_| u8::arbitrary(g)).collect();
1012 let source: MessageEndpoint = MessageEndpoint::new(node_id("source"), port_id("out"));
1013 let target: MessageEndpoint =
1014 MessageEndpoint::new(node_id("wasm"), port_id(&format!("in{port_index}")));
1015 let route: MessageRoute = MessageRoute::new(Some(source), target);
1016 let execution: ExecutionMetadata =
1017 ExecutionMetadata::first_attempt(execution_id("run-quickcheck"));
1018 let metadata = MessageMetadata::new(
1019 message_id(&format!("msg-{port_index}-{packet_index}")),
1020 workflow_id("flow-quickcheck"),
1021 execution,
1022 route,
1023 );
1024
1025 Self { metadata, bytes }
1026 }
1027
1028 fn into_wit(self) -> WitPacket {
1029 WitPacket {
1030 metadata: self.metadata,
1031 payload: WitPayload::Bytes(self.bytes),
1032 }
1033 }
1034 }
1035
1036 fn execution_id(value: &str) -> ExecutionId {
1037 ExecutionId::new(value).expect("valid execution id")
1038 }
1039
1040 fn message_id(value: &str) -> MessageId {
1041 MessageId::new(value).expect("valid message id")
1042 }
1043
1044 fn node_id(value: &str) -> NodeId {
1045 NodeId::new(value).expect("valid node id")
1046 }
1047
1048 fn port_id(value: &str) -> PortId {
1049 PortId::new(value).expect("valid port id")
1050 }
1051
1052 fn workflow_id(value: &str) -> WorkflowId {
1053 WorkflowId::new(value).expect("valid workflow id")
1054 }
1055
1056 fn metadata() -> MessageMetadata {
1057 let source: MessageEndpoint = MessageEndpoint::new(node_id("source"), port_id("out"));
1058 let target: MessageEndpoint = MessageEndpoint::new(node_id("wasm"), port_id("in"));
1059 let route: MessageRoute = MessageRoute::new(Some(source), target);
1060 let execution: ExecutionMetadata = ExecutionMetadata::first_attempt(execution_id("run-1"));
1061 MessageMetadata::new(message_id("msg-1"), workflow_id("flow"), execution, route)
1062 }
1063
1064 fn fixture_port_batches_from_json(json: &str) -> Vec<WitPortBatch> {
1065 serde_json::from_str::<Vec<FixturePortBatch>>(json)
1066 .expect("fixture JSON must parse")
1067 .into_iter()
1068 .map(FixturePortBatch::into_wit)
1069 .collect()
1070 }
1071
1072 fn batch_inputs_from_wit_port_batches(port_batches: Vec<WitPortBatch>) -> BatchInputs {
1073 let mut packets_by_port: BTreeMap<PortId, Vec<PortPacket>> = BTreeMap::new();
1074 for port_batch in port_batches {
1075 let port_id: PortId = port_id(&port_batch.port_id);
1076 let packets: Vec<PortPacket> = port_batch
1077 .packets
1078 .into_iter()
1079 .map(|packet: WitPacket| from_wit_packet(packet).expect("fixture packet decodes"))
1080 .collect();
1081 packets_by_port.insert(port_id, packets);
1082 }
1083
1084 BatchInputs::from_packets(packets_by_port)
1085 }
1086
1087 fn uppercase_fixture_outputs(inputs: &[WitPortBatch]) -> Vec<WitPortBatch> {
1088 let mut packets: Vec<WitPacket> = Vec::new();
1089
1090 for port_batch in inputs {
1091 for packet in &port_batch.packets {
1092 let mut packet: WitPacket = packet.clone();
1093 let WitPayload::Bytes(bytes) = packet.payload else {
1094 panic!("uppercase fixture success vectors must contain only byte payloads");
1095 };
1096 packet.payload = WitPayload::Bytes(
1097 bytes
1098 .into_iter()
1099 .map(|byte: u8| byte.to_ascii_uppercase())
1100 .collect(),
1101 );
1102 packets.push(packet);
1103 }
1104 }
1105
1106 vec![WitPortBatch {
1107 port_id: "out".to_owned(),
1108 packets,
1109 }]
1110 }
1111
1112 fn build_uppercase_guest_fixture() -> PathBuf {
1113 let crate_dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1114 let manifest_path: PathBuf = crate_dir.join(UPPERCASE_FIXTURE_MANIFEST);
1115 let target_dir: PathBuf = env::temp_dir().join(format!(
1116 "pureflow-wasm-uppercase-guest-fixture-{}",
1117 std::process::id()
1118 ));
1119 let artifact_path: PathBuf = target_dir.join(UPPERCASE_FIXTURE_ARTIFACT);
1120 let cargo: OsString = env::var_os("CARGO").unwrap_or_else(|| OsString::from("cargo"));
1121 let output: Output = Command::new(cargo)
1122 .args([
1123 "build",
1124 "--manifest-path",
1125 path_as_str(&manifest_path),
1126 "--target",
1127 "wasm32-wasip2",
1128 "--release",
1129 "--target-dir",
1130 path_as_str(&target_dir),
1131 ])
1132 .env_remove("RUSTFLAGS")
1133 .output()
1134 .expect("fixture build command should run");
1135
1136 assert!(
1137 output.status.success(),
1138 "fixture build failed\nstdout:\n{}\nstderr:\n{}",
1139 String::from_utf8_lossy(&output.stdout),
1140 String::from_utf8_lossy(&output.stderr),
1141 );
1142 assert!(
1143 artifact_path.is_file(),
1144 "fixture artifact was not written to {}",
1145 artifact_path.display(),
1146 );
1147
1148 artifact_path
1149 }
1150
1151 fn wasm32_wasip2_target_available() -> bool {
1152 let rustc: OsString = env::var_os("RUSTC").unwrap_or_else(|| OsString::from("rustc"));
1153 let Ok(output) = Command::new(rustc)
1154 .args(["--print", "target-libdir", "--target", "wasm32-wasip2"])
1155 .env_remove("RUSTFLAGS")
1156 .output()
1157 else {
1158 return false;
1159 };
1160 if !output.status.success() {
1161 return false;
1162 }
1163 let libdir: PathBuf = PathBuf::from(String::from_utf8_lossy(&output.stdout).trim());
1164 fs::read_dir(libdir).is_ok_and(|entries| {
1165 entries.filter_map(std::result::Result::ok).any(|entry| {
1166 entry.file_name().to_str().is_some_and(|name| {
1167 name.starts_with("libcore-")
1168 && Path::new(name)
1169 .extension()
1170 .is_some_and(|extension| extension.eq_ignore_ascii_case("rlib"))
1171 })
1172 })
1173 })
1174 }
1175
1176 fn path_as_str(path: &Path) -> &str {
1177 path.to_str().expect("fixture path should be UTF-8")
1178 }
1179
1180 #[test]
1181 fn constants_name_the_wit_abi() {
1182 assert_eq!(WIT_PACKAGE, "pureflow:batch@0.1.0");
1183 assert_eq!(WIT_WORLD, "pureflow-node");
1184 }
1185
1186 #[test]
1187 fn wasm_capabilities_accept_import_free_descriptor() {
1188 let capabilities: NodeCapabilities = NodeCapabilities::native_passive(
1189 node_id("wasm"),
1190 [
1191 PortCapability::new(port_id("in"), PortCapabilityDirection::Receive),
1192 PortCapability::new(port_id("out"), PortCapabilityDirection::Emit),
1193 ],
1194 )
1195 .expect("valid capabilities");
1196
1197 validate_wasm_capabilities(&capabilities).expect("no host imports required");
1198 }
1199
1200 #[test]
1201 fn wasm_capabilities_reject_effects_without_imports() {
1202 let capabilities: NodeCapabilities = NodeCapabilities::new(
1203 node_id("wasm"),
1204 [PortCapability::new(
1205 port_id("in"),
1206 PortCapabilityDirection::Receive,
1207 )],
1208 [EffectCapability::Clock],
1209 )
1210 .expect("valid descriptor shape");
1211
1212 let err: PureflowError =
1213 validate_wasm_capabilities(&capabilities).expect_err("effect must be denied");
1214
1215 assert_eq!(err.code(), pureflow_core::ErrorCode::InvalidCapabilities);
1216 assert!(err.to_string().contains("not enforceable"));
1217 }
1218
1219 #[test]
1220 fn port_batches_round_trip_bytes_and_control_payloads() {
1221 let mut inputs: BatchInputs = BatchInputs::new();
1222 inputs.push(
1223 port_id("in"),
1224 PortPacket::new(
1225 metadata(),
1226 PacketPayload::from(b"bytes".as_slice().to_vec()),
1227 ),
1228 );
1229 inputs.push(
1230 port_id("control"),
1231 PortPacket::new(metadata(), PacketPayload::from(json!({"op": "flush"}))),
1232 );
1233
1234 let wit_batches: Vec<WitPortBatch> =
1235 to_wit_port_batches(&inputs).expect("inputs should encode as WIT batches");
1236 let outputs: BatchOutputs =
1237 from_wit_port_batches(wit_batches).expect("WIT batches should decode");
1238
1239 assert_eq!(outputs.packets(&port_id("in")).len(), 1);
1240 assert_eq!(outputs.packets(&port_id("control")).len(), 1);
1241 }
1242
1243 #[test]
1244 fn invalid_control_payload_is_rejected() {
1245 let packet: WitPacket = WitPacket {
1246 metadata: metadata(),
1247 payload: WitPayload::Control("not-json".to_owned()),
1248 };
1249
1250 let err: PureflowError = from_wit_packet(packet).expect_err("invalid JSON should fail");
1251
1252 assert_eq!(err.code(), pureflow_core::ErrorCode::NodeExecutionFailed);
1253 }
1254
1255 #[test]
1256 fn dynamic_result_value_decodes_outputs() {
1257 let output = WitPortBatch {
1258 port_id: "out".to_owned(),
1259 packets: vec![WitPacket {
1260 metadata: metadata(),
1261 payload: WitPayload::Bytes(b"payload".to_vec()),
1262 }],
1263 };
1264 let result = Val::Result(Ok(Some(Box::new(Val::List(vec![port_batch_to_val(
1265 output,
1266 )])))));
1267
1268 let outputs = batch_outputs_from_result_val(result).expect("result should decode");
1269
1270 assert_eq!(outputs.packets(&port_id("out")).len(), 1);
1271 }
1272
1273 #[test]
1274 fn dynamic_guest_error_maps_to_execution_error() {
1275 let result = Val::Result(Err(Some(Box::new(Val::Variant(
1276 "guest-failure".to_owned(),
1277 Some(Box::new(Val::String("boom".to_owned()))),
1278 )))));
1279
1280 let err = batch_outputs_from_result_val(result).expect_err("guest error should fail");
1281
1282 assert_eq!(err.code(), pureflow_core::ErrorCode::NodeExecutionFailed);
1283 }
1284
1285 #[test]
1286 fn uppercase_guest_fixture_testdata_matches_wit_shape() {
1287 let inputs: Vec<WitPortBatch> =
1288 fixture_port_batches_from_json(UPPERCASE_FIXTURE_INPUTS_JSON);
1289 let expected_outputs: Vec<WitPortBatch> =
1290 fixture_port_batches_from_json(UPPERCASE_FIXTURE_EXPECTED_OUTPUTS_JSON);
1291
1292 assert_eq!(inputs.len(), 1);
1293 assert_eq!(inputs[0].port_id, "in");
1294 assert_eq!(expected_outputs, uppercase_fixture_outputs(&inputs));
1295
1296 let outputs: BatchOutputs =
1297 from_wit_port_batches(expected_outputs).expect("expected fixture outputs must decode");
1298 assert_eq!(outputs.packets(&port_id("out")).len(), 2);
1299 }
1300
1301 #[test]
1302 fn wasmtime_adapter_invokes_real_uppercase_guest_fixture() {
1303 if !wasm32_wasip2_target_available() {
1304 eprintln!(
1305 "skipping real WASM guest conformance test; run through `nix develop .` to provide wasm32-wasip2"
1306 );
1307 return;
1308 }
1309
1310 let fixture_path: PathBuf = build_uppercase_guest_fixture();
1311 let fixture_bytes: Vec<u8> = fs::read(fixture_path).expect("fixture component is readable");
1312 let component: WasmtimeBatchComponent =
1313 WasmtimeBatchComponent::from_component_bytes(fixture_bytes)
1314 .expect("fixture component compiles");
1315 let empty_outputs: BatchOutputs = component
1316 .invoke(&BatchInputs::new())
1317 .expect("fixture guest accepts an empty batch");
1318 assert!(empty_outputs.packets(&port_id("out")).is_empty());
1319
1320 let inputs: Vec<WitPortBatch> =
1321 fixture_port_batches_from_json(UPPERCASE_FIXTURE_INPUTS_JSON);
1322 let expected_outputs: BatchOutputs = from_wit_port_batches(fixture_port_batches_from_json(
1323 UPPERCASE_FIXTURE_EXPECTED_OUTPUTS_JSON,
1324 ))
1325 .expect("expected fixture outputs decode");
1326
1327 let actual_outputs: BatchOutputs = component
1328 .invoke(&batch_inputs_from_wit_port_batches(inputs))
1329 .expect("fixture guest invocation succeeds");
1330
1331 assert_eq!(actual_outputs, expected_outputs);
1332 }
1333
1334 #[test]
1335 fn wasmtime_adapter_rejects_pre_cancelled_invocation() {
1336 if !wasm32_wasip2_target_available() {
1337 eprintln!(
1338 "skipping WASM cancellation test; run through `nix develop .` to provide wasm32-wasip2"
1339 );
1340 return;
1341 }
1342
1343 let fixture_path: PathBuf = build_uppercase_guest_fixture();
1344 let fixture_bytes: Vec<u8> = fs::read(fixture_path).expect("fixture component is readable");
1345 let component: WasmtimeBatchComponent =
1346 WasmtimeBatchComponent::from_component_bytes(fixture_bytes)
1347 .expect("fixture component compiles");
1348 let cancellation: CancellationToken =
1349 CancellationToken::cancelled(CancellationRequest::new("test shutdown"));
1350
1351 let err: PureflowError = component
1352 .invoke_with_cancellation(&BatchInputs::new(), &cancellation)
1353 .expect_err("pre-cancelled invocation must fail before guest execution");
1354
1355 assert_eq!(err.code(), pureflow_core::ErrorCode::ExecutionCancelled);
1356 assert!(err.to_string().contains("test shutdown"));
1357 }
1358
1359 #[test]
1360 fn wasmtime_adapter_reports_stable_fuel_limit_error() {
1361 if !wasm32_wasip2_target_available() {
1362 eprintln!(
1363 "skipping WASM fuel limit test; run through `nix develop .` to provide wasm32-wasip2"
1364 );
1365 return;
1366 }
1367
1368 let fixture_path: PathBuf = build_uppercase_guest_fixture();
1369 let fixture_bytes: Vec<u8> = fs::read(fixture_path).expect("fixture component is readable");
1370 let component: WasmtimeBatchComponent =
1371 WasmtimeBatchComponent::from_component_bytes_with_limits(
1372 fixture_bytes,
1373 WasmtimeExecutionLimits::new(0),
1374 )
1375 .expect("fixture component compiles");
1376 let inputs: Vec<WitPortBatch> =
1377 fixture_port_batches_from_json(UPPERCASE_FIXTURE_INPUTS_JSON);
1378
1379 let err: PureflowError = component
1380 .invoke(&batch_inputs_from_wit_port_batches(inputs))
1381 .expect_err("zero fuel should trap with a stable host error");
1382
1383 assert_eq!(err.code(), pureflow_core::ErrorCode::NodeExecutionFailed);
1384 assert!(
1385 err.to_string()
1386 .contains("guest exceeded Wasmtime fuel limit of 0 units"),
1387 "unexpected fuel error: {err}"
1388 );
1389 }
1390
1391 #[test]
1392 fn wasmtime_adapter_preserves_generated_byte_batches_across_component_boundary() {
1393 if !wasm32_wasip2_target_available() {
1394 eprintln!(
1395 "skipping generated WASM boundary conformance test; run through `nix develop .` to provide wasm32-wasip2"
1396 );
1397 return;
1398 }
1399
1400 let fixture_path: PathBuf = build_uppercase_guest_fixture();
1401 let fixture_bytes: Vec<u8> = fs::read(fixture_path).expect("fixture component is readable");
1402 let component: WasmtimeBatchComponent =
1403 WasmtimeBatchComponent::from_component_bytes(fixture_bytes)
1404 .expect("fixture component compiles");
1405 let _ = QUICKCHECK_UPPERCASE_COMPONENT.set(component);
1406
1407 QuickCheck::new()
1408 .tests(32)
1409 .quickcheck(generated_byte_batch_boundary_holds as fn(GeneratedBatch) -> bool);
1410 }
1411
1412 fn generated_byte_batch_boundary_holds(generated: GeneratedBatch) -> bool {
1413 let component = QUICKCHECK_UPPERCASE_COMPONENT
1414 .get()
1415 .expect("quickcheck component initialized");
1416 let inputs: Vec<WitPortBatch> = generated.into_wit();
1417 let expected_outputs: BatchOutputs =
1418 from_wit_port_batches(uppercase_fixture_outputs(&inputs))
1419 .expect("generated expected outputs decode");
1420 let actual_outputs: BatchOutputs = component
1421 .invoke(&batch_inputs_from_wit_port_batches(inputs))
1422 .expect("generated fixture guest invocation succeeds");
1423
1424 actual_outputs == expected_outputs
1425 }
1426}