wasmflow_packet/
packet.rs

1use std::collections::HashMap;
2
3use serde::de::DeserializeOwned;
4use serde::{Deserialize, Serialize};
5
6use crate::error::Error;
7#[cfg(feature = "v0")]
8pub use crate::v0;
9pub use crate::v1;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
12#[must_use]
13/// The output payload that component's push out of output ports.
14pub enum Packet {
15  /// Version 0 of the payload format (unstable).
16  #[serde(rename = "v0")]
17  #[cfg(feature = "v0")]
18  V0(v0::Payload),
19  /// Version 1 of the payload format (alpha).
20  #[serde(rename = "v1")]
21  V1(v1::Packet),
22}
23
24impl Packet {
25  #[must_use]
26  /// Does the [Packet] signify the originating job is completed?.
27  pub fn is_done(&self) -> bool {
28    match self {
29      #[cfg(feature = "v0")]
30      Packet::V0(v) => matches!(v, v0::Payload::Done | v0::Payload::Error(_)),
31      Packet::V1(v) => matches!(
32        v,
33        v1::Packet::Signal(v1::Signal::Done) | v1::Packet::Failure(v1::Failure::Error(_))
34      ),
35    }
36  }
37
38  #[must_use]
39  /// Does the [Packet] signify the originating job is completed?.
40  pub fn is_signal(&self) -> bool {
41    match self {
42      #[cfg(feature = "v0")]
43      Packet::V0(v) => matches!(v, v0::Payload::Done),
44      Packet::V1(v) => matches!(v, v1::Packet::Signal(_)),
45    }
46  }
47
48  /// Create a Done signal packet.
49  pub fn done() -> Self {
50    Self::V1(v1::Packet::done())
51  }
52
53  /// Convert a messagepack encoded payload into a [Packet]
54  pub fn from_messagepack(bytes: &[u8]) -> Self {
55    match wasmflow_codec::messagepack::deserialize::<Packet>(bytes) {
56      Ok(packet) => packet,
57      Err(e) => Packet::V1(v1::Packet::error(format!("Error deserializing packet: {}", e))),
58    }
59  }
60
61  /// Converts the [Packet] into a messagepack-compatible transport.
62  pub fn to_messagepack(&mut self) {
63    match self {
64      #[cfg(feature = "v0")]
65      Packet::V0(_) => unimplemented!("Converted a V0 packet to messagepack is not implemented via this function."),
66      Packet::V1(v) => {
67        if let v1::Packet::Success(v) = v {
68          match v {
69            v1::Serialized::MessagePack(_) => { /* nothing */ }
70            v1::Serialized::Struct(v) => {
71              *self = v1::Packet::Success(v1::Serialized::MessagePack(
72                wasmflow_codec::messagepack::serialize(&v).unwrap(),
73              ))
74              .into();
75            }
76            v1::Serialized::Json(json) => {
77              *self = v1::Packet::Success(v1::Serialized::Json(wasmflow_codec::json::serialize(&json).unwrap())).into();
78            }
79          }
80        }
81      }
82    };
83  }
84
85  /// Try to deserialize a [Packet] into the target type
86  pub fn deserialize<T: DeserializeOwned>(self) -> Result<T, Error> {
87    try_from(self)
88  }
89}
90
91fn try_from<T: DeserializeOwned>(value: Packet) -> Result<T, Error> {
92  match value {
93    #[cfg(feature = "v0")]
94    Packet::V0(p) => p.deserialize(),
95
96    Packet::V1(p) => p.deserialize(),
97  }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[must_use]
102/// A [PacketWrapper] is a wrapper around a [Packet] with the port name embedded.
103pub struct PacketWrapper {
104  /// The port name.
105  pub port: String,
106  /// The wrapped packet [Packet].
107  pub payload: Packet,
108}
109
110impl PacketWrapper {
111  /// Create a new [PacketWrapper] by setting the packet directly.
112  pub fn new_raw(port: impl AsRef<str>, packet: Packet) -> Self {
113    PacketWrapper {
114      port: port.as_ref().to_owned(),
115      payload: packet,
116    }
117  }
118}
119
120#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
121#[must_use]
122/// A map of port names to packets.
123pub struct PacketMap {
124  inner: HashMap<String, Packet>,
125}
126
127impl PacketMap {
128  /// Constructor for a new [PacketMap]
129  pub fn new(map: HashMap<String, Packet>) -> Self {
130    Self { inner: map }
131  }
132
133  /// Remove a [Packet] from a [PacketMap]
134  #[must_use]
135  pub fn remove(&mut self, port: &str) -> Option<Packet> {
136    self.inner.remove(port)
137  }
138
139  /// Insert a [Packet] into a [PacketMap]
140  pub fn insert<T: AsRef<str>>(&mut self, port: T, value: impl Serialize) {
141    self
142      .inner
143      .insert(port.as_ref().to_owned(), Packet::V1(v1::Packet::success(&value)));
144  }
145}
146
147impl IntoIterator for PacketMap {
148  type Item = (String, Packet);
149  type IntoIter = std::collections::hash_map::IntoIter<String, Packet>;
150
151  fn into_iter(self) -> Self::IntoIter {
152    self.inner.into_iter()
153  }
154}
155
156impl<K: AsRef<str>, V: Serialize> From<Vec<(K, V)>> for PacketMap {
157  fn from(list: Vec<(K, V)>) -> Self {
158    let mut map = PacketMap::default();
159    for (k, v) in list {
160      map.insert(k, v);
161    }
162    map
163  }
164}
165
166impl<V, const N: usize> From<[(&str, V); N]> for PacketMap
167where
168  V: Serialize + Sized,
169{
170  fn from(list: [(&str, V); N]) -> Self {
171    let mut map = PacketMap::default();
172    for (k, v) in list {
173      map.insert(k, v);
174    }
175    map
176  }
177}