1use bytes::Bytes;
4use pureflow_types::{MessageId, NodeId, PortId, WorkflowId};
5use serde_json::Value;
6
7use crate::context::ExecutionMetadata;
8
9#[derive(Debug, Clone, PartialEq)]
11#[cfg_attr(not(feature = "arrow"), derive(Eq))]
12pub enum PacketPayload {
13 Bytes(Bytes),
15 Control(Value),
17 #[cfg(feature = "arrow")]
19 Arrow(arrow_array::RecordBatch),
20}
21
22impl PacketPayload {
23 #[must_use]
25 pub fn bytes(value: impl Into<Bytes>) -> Self {
26 Self::Bytes(value.into())
27 }
28
29 #[must_use]
31 pub fn control(value: impl Into<Value>) -> Self {
32 Self::Control(value.into())
33 }
34
35 #[must_use]
37 pub const fn as_bytes(&self) -> Option<&Bytes> {
38 match self {
39 Self::Bytes(bytes) => Some(bytes),
40 Self::Control(_) => None,
41 #[cfg(feature = "arrow")]
42 Self::Arrow(_) => None,
43 }
44 }
45
46 #[must_use]
48 pub const fn as_control(&self) -> Option<&Value> {
49 match self {
50 Self::Bytes(_) => None,
51 Self::Control(value) => Some(value),
52 #[cfg(feature = "arrow")]
53 Self::Arrow(_) => None,
54 }
55 }
56
57 #[cfg(feature = "arrow")]
59 #[must_use]
60 pub const fn as_arrow(&self) -> Option<&arrow_array::RecordBatch> {
61 match self {
62 Self::Bytes(_) | Self::Control(_) => None,
63 Self::Arrow(batch) => Some(batch),
64 }
65 }
66}
67
68impl From<Bytes> for PacketPayload {
69 fn from(value: Bytes) -> Self {
70 Self::Bytes(value)
71 }
72}
73
74impl From<Vec<u8>> for PacketPayload {
75 fn from(value: Vec<u8>) -> Self {
76 Self::Bytes(Bytes::from(value))
77 }
78}
79
80impl From<&'static [u8]> for PacketPayload {
81 fn from(value: &'static [u8]) -> Self {
82 Self::Bytes(Bytes::from_static(value))
83 }
84}
85
86impl From<Value> for PacketPayload {
87 fn from(value: Value) -> Self {
88 Self::Control(value)
89 }
90}
91
92#[cfg(feature = "arrow")]
93impl From<arrow_array::RecordBatch> for PacketPayload {
94 fn from(value: arrow_array::RecordBatch) -> Self {
95 Self::Arrow(value)
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct MessageEndpoint {
102 node_id: NodeId,
103 port_id: PortId,
104}
105
106impl MessageEndpoint {
107 #[must_use]
109 pub const fn new(node_id: NodeId, port_id: PortId) -> Self {
110 Self { node_id, port_id }
111 }
112
113 #[must_use]
115 pub const fn node_id(&self) -> &NodeId {
116 &self.node_id
117 }
118
119 #[must_use]
121 pub const fn port_id(&self) -> &PortId {
122 &self.port_id
123 }
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct MessageRoute {
129 source: Option<MessageEndpoint>,
130 target: MessageEndpoint,
131}
132
133impl MessageRoute {
134 #[must_use]
136 pub const fn new(source: Option<MessageEndpoint>, target: MessageEndpoint) -> Self {
137 Self { source, target }
138 }
139
140 #[must_use]
142 pub const fn source(&self) -> Option<&MessageEndpoint> {
143 self.source.as_ref()
144 }
145
146 #[must_use]
148 pub const fn target(&self) -> &MessageEndpoint {
149 &self.target
150 }
151}
152
153#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct MessageMetadata {
156 message_id: MessageId,
157 workflow_id: WorkflowId,
158 execution: ExecutionMetadata,
159 route: MessageRoute,
160}
161
162impl MessageMetadata {
163 #[must_use]
165 pub const fn new(
166 message_id: MessageId,
167 workflow_id: WorkflowId,
168 execution: ExecutionMetadata,
169 route: MessageRoute,
170 ) -> Self {
171 Self {
172 message_id,
173 workflow_id,
174 execution,
175 route,
176 }
177 }
178
179 #[must_use]
181 pub const fn message_id(&self) -> &MessageId {
182 &self.message_id
183 }
184
185 #[must_use]
187 pub const fn workflow_id(&self) -> &WorkflowId {
188 &self.workflow_id
189 }
190
191 #[must_use]
193 pub const fn execution(&self) -> &ExecutionMetadata {
194 &self.execution
195 }
196
197 #[must_use]
199 pub const fn route(&self) -> &MessageRoute {
200 &self.route
201 }
202}
203
204#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct MessageEnvelope<P> {
207 metadata: MessageMetadata,
208 payload: P,
209}
210
211impl<P> MessageEnvelope<P> {
212 #[must_use]
214 pub const fn new(metadata: MessageMetadata, payload: P) -> Self {
215 Self { metadata, payload }
216 }
217
218 #[must_use]
220 pub const fn metadata(&self) -> &MessageMetadata {
221 &self.metadata
222 }
223
224 #[must_use]
226 pub const fn payload(&self) -> &P {
227 &self.payload
228 }
229
230 #[must_use]
232 pub fn into_payload(self) -> P {
233 self.payload
234 }
235
236 #[must_use]
238 pub fn map_payload<Q>(self, f: impl FnOnce(P) -> Q) -> MessageEnvelope<Q> {
239 MessageEnvelope {
240 metadata: self.metadata,
241 payload: f(self.payload),
242 }
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use pureflow_types::ExecutionId;
250 use serde_json::json;
251
252 fn execution_id(value: &str) -> ExecutionId {
253 ExecutionId::new(value).expect("valid execution id")
254 }
255
256 fn message_id(value: &str) -> MessageId {
257 MessageId::new(value).expect("valid message id")
258 }
259
260 fn node_id(value: &str) -> NodeId {
261 NodeId::new(value).expect("valid node id")
262 }
263
264 fn port_id(value: &str) -> PortId {
265 PortId::new(value).expect("valid port id")
266 }
267
268 fn workflow_id(value: &str) -> WorkflowId {
269 WorkflowId::new(value).expect("valid workflow id")
270 }
271
272 fn execution() -> ExecutionMetadata {
273 ExecutionMetadata::first_attempt(execution_id("run-1"))
274 }
275
276 #[test]
277 fn message_envelope_keeps_payload_separate_from_metadata() {
278 let target: MessageEndpoint = MessageEndpoint::new(node_id("consumer"), port_id("in"));
279 let route: MessageRoute = MessageRoute::new(None, target);
280 let metadata: MessageMetadata =
281 MessageMetadata::new(message_id("msg-1"), workflow_id("flow"), execution(), route);
282 let envelope: MessageEnvelope<&str> = MessageEnvelope::new(metadata, "payload");
283 let mapped: MessageEnvelope<usize> = envelope.map_payload(str::len);
284
285 assert_eq!(mapped.payload(), &7);
286 assert_eq!(mapped.metadata().message_id().as_str(), "msg-1");
287 assert_eq!(
288 mapped.metadata().route().target().node_id().as_str(),
289 "consumer"
290 );
291 }
292
293 #[test]
294 fn packet_payload_bytes_clone_and_slice_without_copying_user_data() {
295 let payload: PacketPayload = PacketPayload::bytes(Bytes::from_static(b"abcdef"));
296 let cloned: PacketPayload = payload.clone();
297 let sliced: Bytes = cloned
298 .as_bytes()
299 .expect("payload should contain bytes")
300 .slice(1..4);
301
302 assert_eq!(
303 payload
304 .as_bytes()
305 .expect("payload should contain bytes")
306 .as_ref(),
307 b"abcdef"
308 );
309 assert!(payload.as_control().is_none());
310 assert_eq!(sliced.as_ref(), b"bcd");
311 }
312
313 #[test]
314 fn packet_payload_control_carries_structured_values() {
315 let payload: PacketPayload = PacketPayload::control(json!({
316 "command": "flush",
317 "priority": 3,
318 }));
319 let control: &Value = payload
320 .as_control()
321 .expect("payload should contain control data");
322
323 assert_eq!(control["command"], "flush");
324 assert_eq!(control["priority"], 3);
325 assert!(payload.as_bytes().is_none());
326 }
327
328 #[cfg(feature = "arrow")]
329 #[test]
330 fn packet_payload_arrow_carries_record_batches() {
331 use std::sync::Arc;
332
333 use arrow_array::{Int32Array, RecordBatch};
334 use arrow_schema::{DataType, Field, Schema};
335
336 let schema = Arc::new(Schema::new(vec![Field::new(
337 "value",
338 DataType::Int32,
339 false,
340 )]));
341 let values = Arc::new(Int32Array::from(vec![1, 2, 3]));
342 let batch: RecordBatch =
343 RecordBatch::try_new(schema, vec![values]).expect("record batch should be valid");
344 let payload: PacketPayload = PacketPayload::from(batch.clone());
345
346 assert_eq!(payload.as_arrow(), Some(&batch));
347 assert!(payload.as_bytes().is_none());
348 assert!(payload.as_control().is_none());
349 }
350}