plexor_core/payload.rs
1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::codec::{Codec, CodecName};
8use crate::logging::TraceContext;
9use crate::neuron::Neuron;
10use std::sync::Arc;
11use uuid::Uuid;
12
13/// A typed payload containing a value, its associated neuron, and tracing context.
14///
15/// # Examples
16///
17/// ```rust
18/// # use std::sync::Arc;
19/// # use plexor_core::payload::Payload;
20/// # use plexor_core::neuron::{Neuron, NeuronImpl};
21/// # use plexor_core::namespace::NamespaceImpl;
22/// # use plexor_core::codec::{Codec, CodecError, CodecName};
23/// #
24/// # #[derive(Debug, Clone, PartialEq)]
25/// # struct Dummy(u32);
26/// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
27/// # impl Codec<Dummy> for Dummy {
28/// # fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
29/// # fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy(0)) }
30/// # }
31/// #
32/// # let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
33/// # let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
34/// #
35/// let payload = Payload::new(Dummy(42), neuron);
36/// assert_eq!(*payload.value, Dummy(42));
37/// ```
38#[derive(Debug)]
39pub struct Payload<T, C> {
40 pub value: Arc<T>,
41 pub neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
42 pub trace: TraceContext,
43}
44
45impl<T, C> Payload<T, C> {
46 pub fn from_parts(
47 value: Arc<T>,
48 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
49 trace: TraceContext,
50 ) -> Self {
51 Self {
52 value,
53 neuron,
54 trace,
55 }
56 }
57
58 /// Helper to create a new Payload with default tracing fields (new span_id, no parent_id).
59 /// Used when initiating a new signal.
60 ///
61 /// # Examples
62 ///
63 /// ```rust
64 /// # use std::sync::Arc;
65 /// # use plexor_core::payload::Payload;
66 /// # use plexor_core::neuron::{Neuron, NeuronImpl};
67 /// # use plexor_core::namespace::NamespaceImpl;
68 /// # use plexor_core::codec::{Codec, CodecError, CodecName};
69 /// #
70 /// # #[derive(Debug, Clone, PartialEq)]
71 /// # struct Dummy(u32);
72 /// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
73 /// # impl Codec<Dummy> for Dummy {
74 /// # fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
75 /// # fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy(0)) }
76 /// # }
77 /// #
78 /// # let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
79 /// # let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
80 /// # use uuid::Uuid;
81 /// #
82 /// let correlation_id = Uuid::now_v7();
83 /// let payload = Payload::with_correlation(Dummy(42), neuron, Some(correlation_id));
84 /// assert_eq!(payload.correlation_id(), correlation_id);
85 /// ```
86 pub fn with_correlation(
87 value: T,
88 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
89 correlation_id: Option<Uuid>,
90 ) -> Arc<Self>
91 where
92 T: Send + Sync + 'static,
93 C: Codec<T> + CodecName + Send + Sync + 'static,
94 {
95 let correlation = correlation_id.unwrap_or_else(Uuid::now_v7);
96 // New trace: span_id is lower 64 bits of correlation_id
97 let span_id = correlation.as_u128() as u64;
98
99 Arc::new(Payload::from_parts(
100 Arc::new(value),
101 neuron,
102 TraceContext::from_parts(correlation, span_id, None),
103 ))
104 }
105
106 /// Simplified creation method for new signals.
107 /// automatically generates correlation_id and other tracing fields.
108 ///
109 /// # Examples
110 ///
111 /// ```rust
112 /// # use std::sync::Arc;
113 /// # use plexor_core::payload::Payload;
114 /// # use plexor_core::neuron::{Neuron, NeuronImpl};
115 /// # use plexor_core::namespace::NamespaceImpl;
116 /// # use plexor_core::codec::{Codec, CodecError, CodecName};
117 /// #
118 /// # #[derive(Debug, Clone, PartialEq)]
119 /// # struct Dummy { foo: i32 }
120 /// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
121 /// # impl Codec<Dummy> for Dummy {
122 /// # fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
123 /// # fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy { foo: 42 }) }
124 /// # }
125 /// #
126 /// let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
127 /// let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
128 /// let data = Dummy { foo: 123 };
129 ///
130 /// let payload = Payload::new(data.clone(), neuron);
131 /// assert_eq!(*payload.value, data);
132 /// ```
133 pub fn new(value: T, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> Arc<Self>
134 where
135 T: Send + Sync + 'static,
136 C: Codec<T> + CodecName + Send + Sync + 'static,
137 {
138 Self::with_correlation(value, neuron, None)
139 }
140
141 /// Create a builder for constructing a Payload.
142 ///
143 /// # Examples
144 ///
145 /// ```rust
146 /// # use std::sync::Arc;
147 /// # use plexor_core::payload::Payload;
148 /// # use plexor_core::neuron::{Neuron, NeuronImpl};
149 /// # use plexor_core::namespace::NamespaceImpl;
150 /// # use plexor_core::codec::{Codec, CodecError, CodecName};
151 /// #
152 /// # #[derive(Debug, Clone, PartialEq)]
153 /// # struct Dummy(u32);
154 /// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
155 /// # impl Codec<Dummy> for Dummy {
156 /// # fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
157 /// # fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy(0)) }
158 /// # }
159 /// #
160 /// # let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
161 /// # let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
162 /// #
163 /// let payload = Payload::builder()
164 /// .value(Dummy(42))
165 /// .neuron(neuron)
166 /// .build()
167 /// .unwrap();
168 /// assert_eq!(*payload.value, Dummy(42));
169 /// ```
170 pub fn builder() -> PayloadBuilder<T, C> {
171 PayloadBuilder::default()
172 }
173
174 pub fn correlation_id(&self) -> Uuid {
175 self.trace.correlation_id
176 }
177
178 pub fn span_id(&self) -> u64 {
179 self.trace.span_id
180 }
181
182 pub fn parent_id(&self) -> Option<u64> {
183 self.trace.parent_id
184 }
185}
186
187pub struct PayloadBuilder<T, C> {
188 value: Option<T>,
189 correlation_id: Option<Uuid>,
190 neuron: Option<Arc<dyn Neuron<T, C> + Send + Sync>>,
191 span_id: Option<u64>,
192 parent_id: Option<u64>,
193}
194
195impl<T, C> Default for PayloadBuilder<T, C> {
196 fn default() -> Self {
197 Self {
198 value: None,
199 correlation_id: None,
200 neuron: None,
201 span_id: None,
202 parent_id: None,
203 }
204 }
205}
206
207impl<T, C> PayloadBuilder<T, C> {
208 pub fn value(mut self, value: T) -> Self {
209 self.value = Some(value);
210 self
211 }
212
213 pub fn correlation_id(mut self, id: Uuid) -> Self {
214 self.correlation_id = Some(id);
215 self
216 }
217
218 pub fn neuron(mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> Self {
219 self.neuron = Some(neuron);
220 self
221 }
222
223 pub fn span_id(mut self, id: u64) -> Self {
224 self.span_id = Some(id);
225 self
226 }
227
228 pub fn parent_id(mut self, id: u64) -> Self {
229 self.parent_id = Some(id);
230 self
231 }
232
233 pub fn build(self) -> Result<Arc<Payload<T, C>>, String>
234 where
235 T: Send + Sync + 'static,
236 C: Codec<T> + CodecName + Send + Sync + 'static,
237 {
238 let value = self.value.ok_or("Value is required")?;
239 let neuron = self.neuron.ok_or("Neuron is required")?;
240 let correlation = self.correlation_id.unwrap_or_else(Uuid::now_v7);
241
242 let span = self.span_id.unwrap_or_else(|| correlation.as_u128() as u64);
243
244 Ok(Arc::new(Payload {
245 value: Arc::new(value),
246 neuron,
247 trace: TraceContext::from_parts(correlation, span, self.parent_id),
248 }))
249 }
250}
251
252/// A type-erased raw payload containing bytes, its associated neuron, and tracing context.
253///
254/// # Examples
255///
256/// ```rust
257/// # use std::sync::Arc;
258/// # use plexor_core::payload::PayloadRaw;
259/// # use plexor_core::neuron::{Neuron, NeuronImpl};
260/// # use plexor_core::namespace::NamespaceImpl;
261/// # use plexor_core::codec::{Codec, CodecError, CodecName};
262/// #
263/// # #[derive(Debug, Clone, PartialEq)]
264/// # struct Dummy;
265/// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
266/// # impl Codec<Dummy> for Dummy {
267/// # fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![1, 2, 3]) }
268/// # fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy) }
269/// # }
270/// #
271/// # let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
272/// # let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
273/// #
274/// let payload = PayloadRaw::new(vec![1, 2, 3], neuron);
275/// assert_eq!(*payload.value, vec![1, 2, 3]);
276/// ```
277#[derive(Debug)]
278pub struct PayloadRaw<T, C> {
279 pub value: Arc<Vec<u8>>,
280 pub neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
281 pub trace: TraceContext,
282}
283
284impl<T, C> PayloadRaw<T, C> {
285 pub fn from_parts(
286 value: Arc<Vec<u8>>,
287 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
288 trace: TraceContext,
289 ) -> Self {
290 Self {
291 value,
292 neuron,
293 trace,
294 }
295 }
296
297 /// Helper to create a new PayloadRaw with default tracing fields (new span_id, no parent_id).
298 /// Used when initiating a new signal where payload is already raw bytes.
299 pub fn with_correlation(
300 value: Vec<u8>,
301 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
302 correlation_id: Option<Uuid>,
303 ) -> Arc<Self>
304 where
305 T: Send + Sync + 'static,
306 C: Codec<T> + CodecName + Send + Sync + 'static,
307 {
308 let correlation = correlation_id.unwrap_or_else(Uuid::now_v7);
309 let span_id = correlation.as_u128() as u64;
310
311 Arc::new(PayloadRaw::from_parts(
312 Arc::new(value),
313 neuron,
314 TraceContext::from_parts(correlation, span_id, None),
315 ))
316 }
317
318 /// # Examples
319 ///
320 /// ```rust
321 /// # use std::sync::Arc;
322 /// # use plexor_core::payload::PayloadRaw;
323 /// # use plexor_core::neuron::{Neuron, NeuronImpl};
324 /// # use plexor_core::namespace::NamespaceImpl;
325 /// # use plexor_core::codec::{Codec, CodecError, CodecName};
326 /// #
327 /// # #[derive(Debug, Clone)]
328 /// # struct Dummy;
329 /// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
330 /// # impl Codec<Dummy> for Dummy {
331 /// # fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
332 /// # fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy) }
333 /// # }
334 /// #
335 /// let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
336 /// let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
337 /// let raw_data = vec![1, 2, 3];
338 ///
339 /// let payload = PayloadRaw::new(raw_data.clone(), neuron);
340 /// assert_eq!(*payload.value, raw_data);
341 /// ```
342 pub fn new(value: Vec<u8>, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> Arc<Self>
343 where
344 T: Send + Sync + 'static,
345 C: Codec<T> + CodecName + Send + Sync + 'static,
346 {
347 Self::with_correlation(value, neuron, None)
348 }
349
350 pub fn correlation_id(&self) -> Uuid {
351 self.trace.correlation_id
352 }
353
354 pub fn span_id(&self) -> u64 {
355 self.trace.span_id
356 }
357
358 pub fn parent_id(&self) -> Option<u64> {
359 self.trace.parent_id
360 }
361}