wasmflow_packet/
v1.rs

1use std::collections::HashMap;
2
3use serde::de::DeserializeOwned;
4use serde::{Deserialize, Serialize};
5use wasmflow_codec::messagepack::rmp_serialize;
6use wasmflow_codec::raw::raw_serialize;
7
8use crate::error::Error;
9use crate::Packet as RootPacket;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
12#[must_use]
13/// A component's output data.
14pub enum Packet {
15  /// A successful message.
16  #[serde(rename = "0")]
17  Success(Serialized),
18
19  /// A message stemming from an error somewhere.
20  #[serde(rename = "1")]
21  Failure(Failure),
22
23  /// An error. Used by library authors to indicate a problem.
24  #[serde(rename = "2")]
25  Signal(Signal),
26}
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28
29/// A success message.
30#[must_use]
31pub enum Serialized {
32  /// A message carrying a payload encoded with MessagePack.
33  #[serde(rename = "0")]
34  MessagePack(Vec<u8>),
35
36  /// A successful payload in a generic intermediary format.
37  #[serde(rename = "1")]
38  Struct(serde_value::Value),
39
40  /// A payload represented as a raw JSON String.
41  #[serde(rename = "2")]
42  Json(String),
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
46/// A Failure message.
47#[must_use]
48pub enum Failure {
49  /// Invalid payload. Used when a default message is unavoidable.
50  #[serde(rename = "0")]
51  Invalid,
52
53  /// A message carrying an exception (an error that short-circuited a port's downstream).
54  #[serde(rename = "1")]
55  Exception(String),
56
57  /// A message carrying an error (an error that short circuited all downstreams from a component).
58  #[serde(rename = "2")]
59  Error(String),
60}
61
62#[allow(missing_copy_implementations)]
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64/// Internal signals that need to be handled before propagating to a downstream consumer.
65#[must_use]
66pub enum Signal {
67  /// Indicates the job that opened this port is finished with it.
68  #[serde(rename = "0")]
69  Done,
70
71  /// Indicates that a message is coming down in chunks and this is the start.
72  #[doc(hidden)]
73  #[serde(rename = "1")]
74  OpenBracket,
75
76  /// Indicates a chunked message has been completed.
77  #[serde(rename = "2")]
78  #[doc(hidden)]
79  CloseBracket,
80}
81
82impl Packet {
83  /// A one-liner to turn a serializable object into a [Serialized::MessagePack] variant.
84  pub fn messagepack<T: Serialize>(t: &T) -> Self {
85    match rmp_serialize(t) {
86      Ok(bytes) => Self::Success(Serialized::MessagePack(bytes)),
87      Err(e) => Self::Failure(Failure::Error(e.to_string())),
88    }
89  }
90
91  /// A one-liner to turn a serializable object into a [Packet::Success] variant.
92  pub fn success<T: Serialize>(t: &T) -> Self {
93    match raw_serialize(t) {
94      Ok(bytes) => Self::Success(Serialized::Struct(bytes)),
95      Err(e) => Self::Failure(Failure::Error(e.to_string())),
96    }
97  }
98
99  /// Creates a [Packet::Signal(Signal::Done)]
100  pub fn done() -> Self {
101    Self::Signal(Signal::Done)
102  }
103
104  /// Creates a [Packet::Failure(Failure::Exception)]
105  pub fn exception<T: AsRef<str>>(msg: T) -> Self {
106    Self::Failure(Failure::Exception(msg.as_ref().to_owned()))
107  }
108
109  /// Creates a [Packet::Failure(Failure::Error)]
110  pub fn error<T: AsRef<str>>(msg: T) -> Self {
111    Self::Failure(Failure::Error(msg.as_ref().to_owned()))
112  }
113
114  /// Try to deserialize a [Packet] into the target type
115  pub fn deserialize<T: DeserializeOwned>(self) -> Result<T, Error> {
116    try_from(self)
117  }
118}
119
120fn try_from<T: DeserializeOwned>(value: Packet) -> Result<T, Error> {
121  match value {
122    Packet::Success(success) => match success {
123      Serialized::MessagePack(v) => wasmflow_codec::messagepack::deserialize(&v).map_err(Error::DeserializationError),
124      Serialized::Struct(v) => wasmflow_codec::raw::deserialize(v).map_err(Error::DeserializationError),
125      Serialized::Json(v) => wasmflow_codec::json::deserialize(&v).map_err(Error::DeserializationError),
126    },
127    Packet::Failure(failure) => match failure {
128      Failure::Invalid => Err(Error::Invalid),
129      Failure::Exception(v) => Err(Error::Exception(v)),
130      Failure::Error(v) => Err(Error::Error(v)),
131    },
132    Packet::Signal(_) => Err(Error::Signal),
133  }
134}
135
136impl From<Packet> for RootPacket {
137  fn from(v: Packet) -> Self {
138    Self::V1(v)
139  }
140}
141
142impl From<Serialized> for Packet {
143  fn from(v: Serialized) -> Self {
144    Packet::Success(v)
145  }
146}
147
148impl From<Failure> for Packet {
149  fn from(v: Failure) -> Self {
150    Packet::Failure(v)
151  }
152}
153
154impl From<Signal> for Packet {
155  fn from(v: Signal) -> Self {
156    Packet::Signal(v)
157  }
158}
159
160#[cfg(feature = "v0")]
161impl From<super::v0::Payload> for Packet {
162  fn from(p: super::v0::Payload) -> Self {
163    match p {
164      crate::v0::Payload::Invalid => Packet::Failure(Failure::Invalid),
165      crate::v0::Payload::Exception(v) => Packet::Failure(Failure::Exception(v)),
166      crate::v0::Payload::Error(v) => Packet::Failure(Failure::Error(v)),
167      crate::v0::Payload::MessagePack(v) => Packet::Success(Serialized::MessagePack(v)),
168      crate::v0::Payload::Done => Packet::Signal(Signal::Done),
169      crate::v0::Payload::OpenBracket => Packet::Signal(Signal::OpenBracket),
170      crate::v0::Payload::CloseBracket => Packet::Signal(Signal::CloseBracket),
171      crate::v0::Payload::Success(v) => Packet::Success(Serialized::Struct(v)),
172      crate::v0::Payload::Json(v) => Packet::Success(Serialized::Json(v)),
173    }
174  }
175}
176
177#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
178/// A map of port names to packets.
179pub struct PacketMap {
180  inner: HashMap<String, Packet>,
181}
182
183impl PacketMap {
184  /// Create a new [PacketMap]
185  #[must_use]
186  pub fn new(map: HashMap<String, Packet>) -> Self {
187    Self { inner: map }
188  }
189
190  /// Remove a [Packet] from the [PacketMap].
191  #[must_use]
192  pub fn remove(&mut self, port: &str) -> Option<Packet> {
193    self.inner.remove(port)
194  }
195
196  /// Insert a [Packet] from the [PacketMap].
197  pub fn insert(&mut self, port: String, value: Packet) {
198    self.inner.insert(port, value);
199  }
200}
201
202impl IntoIterator for PacketMap {
203  type Item = (String, Packet);
204  type IntoIter = std::collections::hash_map::IntoIter<String, Packet>;
205
206  fn into_iter(self) -> Self::IntoIter {
207    self.inner.into_iter()
208  }
209}