Skip to main content

pureflow_core/
message.rs

1//! Message envelope and routing metadata types.
2
3use bytes::Bytes;
4use pureflow_types::{MessageId, NodeId, PortId, WorkflowId};
5use serde_json::Value;
6
7use crate::context::ExecutionMetadata;
8
9/// Packet payload carried over runtime ports.
10#[derive(Debug, Clone, PartialEq)]
11#[cfg_attr(not(feature = "arrow"), derive(Eq))]
12pub enum PacketPayload {
13    /// Ordinary byte payload.
14    Bytes(Bytes),
15    /// Control-plane payload for orchestration messages.
16    Control(Value),
17    /// Apache Arrow record batch payload.
18    #[cfg(feature = "arrow")]
19    Arrow(arrow_array::RecordBatch),
20}
21
22impl PacketPayload {
23    /// Create a byte payload.
24    #[must_use]
25    pub fn bytes(value: impl Into<Bytes>) -> Self {
26        Self::Bytes(value.into())
27    }
28
29    /// Create a control-plane payload.
30    #[must_use]
31    pub fn control(value: impl Into<Value>) -> Self {
32        Self::Control(value.into())
33    }
34
35    /// Borrow the payload as bytes when this is a byte payload.
36    #[must_use]
37    pub const fn as_bytes(&self) -> Option<&Bytes> {
38        match self {
39            Self::Bytes(bytes) => Some(bytes),
40            Self::Control(_) => None,
41            #[cfg(feature = "arrow")]
42            Self::Arrow(_) => None,
43        }
44    }
45
46    /// Borrow the payload as control data when this is a control payload.
47    #[must_use]
48    pub const fn as_control(&self) -> Option<&Value> {
49        match self {
50            Self::Bytes(_) => None,
51            Self::Control(value) => Some(value),
52            #[cfg(feature = "arrow")]
53            Self::Arrow(_) => None,
54        }
55    }
56
57    /// Borrow the payload as an Arrow record batch when this is an Arrow payload.
58    #[cfg(feature = "arrow")]
59    #[must_use]
60    pub const fn as_arrow(&self) -> Option<&arrow_array::RecordBatch> {
61        match self {
62            Self::Bytes(_) | Self::Control(_) => None,
63            Self::Arrow(batch) => Some(batch),
64        }
65    }
66}
67
68impl From<Bytes> for PacketPayload {
69    fn from(value: Bytes) -> Self {
70        Self::Bytes(value)
71    }
72}
73
74impl From<Vec<u8>> for PacketPayload {
75    fn from(value: Vec<u8>) -> Self {
76        Self::Bytes(Bytes::from(value))
77    }
78}
79
80impl From<&'static [u8]> for PacketPayload {
81    fn from(value: &'static [u8]) -> Self {
82        Self::Bytes(Bytes::from_static(value))
83    }
84}
85
86impl From<Value> for PacketPayload {
87    fn from(value: Value) -> Self {
88        Self::Control(value)
89    }
90}
91
92#[cfg(feature = "arrow")]
93impl From<arrow_array::RecordBatch> for PacketPayload {
94    fn from(value: arrow_array::RecordBatch) -> Self {
95        Self::Arrow(value)
96    }
97}
98
99/// Node/port endpoint for a message envelope.
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct MessageEndpoint {
102    node_id: NodeId,
103    port_id: PortId,
104}
105
106impl MessageEndpoint {
107    /// Create a message endpoint.
108    #[must_use]
109    pub const fn new(node_id: NodeId, port_id: PortId) -> Self {
110        Self { node_id, port_id }
111    }
112
113    /// Node referenced by this endpoint.
114    #[must_use]
115    pub const fn node_id(&self) -> &NodeId {
116        &self.node_id
117    }
118
119    /// Port referenced by this endpoint.
120    #[must_use]
121    pub const fn port_id(&self) -> &PortId {
122        &self.port_id
123    }
124}
125
126/// Static routing metadata carried alongside a message payload.
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct MessageRoute {
129    source: Option<MessageEndpoint>,
130    target: MessageEndpoint,
131}
132
133impl MessageRoute {
134    /// Create routing metadata from an optional source to a required target.
135    #[must_use]
136    pub const fn new(source: Option<MessageEndpoint>, target: MessageEndpoint) -> Self {
137        Self { source, target }
138    }
139
140    /// Upstream source endpoint, absent for externally injected messages.
141    #[must_use]
142    pub const fn source(&self) -> Option<&MessageEndpoint> {
143        self.source.as_ref()
144    }
145
146    /// Downstream target endpoint.
147    #[must_use]
148    pub const fn target(&self) -> &MessageEndpoint {
149        &self.target
150    }
151}
152
153/// Metadata attached to every message envelope.
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct MessageMetadata {
156    message_id: MessageId,
157    workflow_id: WorkflowId,
158    execution: ExecutionMetadata,
159    route: MessageRoute,
160}
161
162impl MessageMetadata {
163    /// Create metadata for one message envelope.
164    #[must_use]
165    pub const fn new(
166        message_id: MessageId,
167        workflow_id: WorkflowId,
168        execution: ExecutionMetadata,
169        route: MessageRoute,
170    ) -> Self {
171        Self {
172            message_id,
173            workflow_id,
174            execution,
175            route,
176        }
177    }
178
179    /// Identifier for this message.
180    #[must_use]
181    pub const fn message_id(&self) -> &MessageId {
182        &self.message_id
183    }
184
185    /// Workflow associated with this message.
186    #[must_use]
187    pub const fn workflow_id(&self) -> &WorkflowId {
188        &self.workflow_id
189    }
190
191    /// Execution metadata associated with this message.
192    #[must_use]
193    pub const fn execution(&self) -> &ExecutionMetadata {
194        &self.execution
195    }
196
197    /// Static route for this message.
198    #[must_use]
199    pub const fn route(&self) -> &MessageRoute {
200        &self.route
201    }
202}
203
204/// Runtime message envelope that keeps payloads separate from routing metadata.
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct MessageEnvelope<P> {
207    metadata: MessageMetadata,
208    payload: P,
209}
210
211impl<P> MessageEnvelope<P> {
212    /// Create a message envelope.
213    #[must_use]
214    pub const fn new(metadata: MessageMetadata, payload: P) -> Self {
215        Self { metadata, payload }
216    }
217
218    /// Metadata that travels with the payload.
219    #[must_use]
220    pub const fn metadata(&self) -> &MessageMetadata {
221        &self.metadata
222    }
223
224    /// Borrow the payload.
225    #[must_use]
226    pub const fn payload(&self) -> &P {
227        &self.payload
228    }
229
230    /// Consume the envelope and return the payload.
231    #[must_use]
232    pub fn into_payload(self) -> P {
233        self.payload
234    }
235
236    /// Transform the payload while preserving metadata.
237    #[must_use]
238    pub fn map_payload<Q>(self, f: impl FnOnce(P) -> Q) -> MessageEnvelope<Q> {
239        MessageEnvelope {
240            metadata: self.metadata,
241            payload: f(self.payload),
242        }
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use pureflow_types::ExecutionId;
250    use serde_json::json;
251
252    fn execution_id(value: &str) -> ExecutionId {
253        ExecutionId::new(value).expect("valid execution id")
254    }
255
256    fn message_id(value: &str) -> MessageId {
257        MessageId::new(value).expect("valid message id")
258    }
259
260    fn node_id(value: &str) -> NodeId {
261        NodeId::new(value).expect("valid node id")
262    }
263
264    fn port_id(value: &str) -> PortId {
265        PortId::new(value).expect("valid port id")
266    }
267
268    fn workflow_id(value: &str) -> WorkflowId {
269        WorkflowId::new(value).expect("valid workflow id")
270    }
271
272    fn execution() -> ExecutionMetadata {
273        ExecutionMetadata::first_attempt(execution_id("run-1"))
274    }
275
276    #[test]
277    fn message_envelope_keeps_payload_separate_from_metadata() {
278        let target: MessageEndpoint = MessageEndpoint::new(node_id("consumer"), port_id("in"));
279        let route: MessageRoute = MessageRoute::new(None, target);
280        let metadata: MessageMetadata =
281            MessageMetadata::new(message_id("msg-1"), workflow_id("flow"), execution(), route);
282        let envelope: MessageEnvelope<&str> = MessageEnvelope::new(metadata, "payload");
283        let mapped: MessageEnvelope<usize> = envelope.map_payload(str::len);
284
285        assert_eq!(mapped.payload(), &7);
286        assert_eq!(mapped.metadata().message_id().as_str(), "msg-1");
287        assert_eq!(
288            mapped.metadata().route().target().node_id().as_str(),
289            "consumer"
290        );
291    }
292
293    #[test]
294    fn packet_payload_bytes_clone_and_slice_without_copying_user_data() {
295        let payload: PacketPayload = PacketPayload::bytes(Bytes::from_static(b"abcdef"));
296        let cloned: PacketPayload = payload.clone();
297        let sliced: Bytes = cloned
298            .as_bytes()
299            .expect("payload should contain bytes")
300            .slice(1..4);
301
302        assert_eq!(
303            payload
304                .as_bytes()
305                .expect("payload should contain bytes")
306                .as_ref(),
307            b"abcdef"
308        );
309        assert!(payload.as_control().is_none());
310        assert_eq!(sliced.as_ref(), b"bcd");
311    }
312
313    #[test]
314    fn packet_payload_control_carries_structured_values() {
315        let payload: PacketPayload = PacketPayload::control(json!({
316            "command": "flush",
317            "priority": 3,
318        }));
319        let control: &Value = payload
320            .as_control()
321            .expect("payload should contain control data");
322
323        assert_eq!(control["command"], "flush");
324        assert_eq!(control["priority"], 3);
325        assert!(payload.as_bytes().is_none());
326    }
327
328    #[cfg(feature = "arrow")]
329    #[test]
330    fn packet_payload_arrow_carries_record_batches() {
331        use std::sync::Arc;
332
333        use arrow_array::{Int32Array, RecordBatch};
334        use arrow_schema::{DataType, Field, Schema};
335
336        let schema = Arc::new(Schema::new(vec![Field::new(
337            "value",
338            DataType::Int32,
339            false,
340        )]));
341        let values = Arc::new(Int32Array::from(vec![1, 2, 3]));
342        let batch: RecordBatch =
343            RecordBatch::try_new(schema, vec![values]).expect("record batch should be valid");
344        let payload: PacketPayload = PacketPayload::from(batch.clone());
345
346        assert_eq!(payload.as_arrow(), Some(&batch));
347        assert!(payload.as_bytes().is_none());
348        assert!(payload.as_control().is_none());
349    }
350}