Skip to main content

plexor_core/
payload.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::codec::{Codec, CodecName};
8use crate::logging::TraceContext;
9use crate::neuron::Neuron;
10use std::sync::Arc;
11use uuid::Uuid;
12
13/// A typed payload containing a value, its associated neuron, and tracing context.
14///
15/// # Examples
16///
17/// ```rust
18/// # use std::sync::Arc;
19/// # use plexor_core::payload::Payload;
20/// # use plexor_core::neuron::{Neuron, NeuronImpl};
21/// # use plexor_core::namespace::NamespaceImpl;
22/// # use plexor_core::codec::{Codec, CodecError, CodecName};
23/// #
24/// # #[derive(Debug, Clone, PartialEq)]
25/// # struct Dummy(u32);
26/// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
27/// # impl Codec<Dummy> for Dummy {
28/// #     fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
29/// #     fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy(0)) }
30/// # }
31/// #
32/// # let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
33/// # let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
34/// #
35/// let payload = Payload::new(Dummy(42), neuron);
36/// assert_eq!(*payload.value, Dummy(42));
37/// ```
38#[derive(Debug)]
39pub struct Payload<T, C> {
40    pub value: Arc<T>,
41    pub neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
42    pub trace: TraceContext,
43}
44
45impl<T, C> Payload<T, C> {
46    pub fn from_parts(
47        value: Arc<T>,
48        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
49        trace: TraceContext,
50    ) -> Self {
51        Self {
52            value,
53            neuron,
54            trace,
55        }
56    }
57
58    /// Helper to create a new Payload with default tracing fields (new span_id, no parent_id).
59    /// Used when initiating a new signal.
60    ///
61    /// # Examples
62    ///
63    /// ```rust
64    /// # use std::sync::Arc;
65    /// # use plexor_core::payload::Payload;
66    /// # use plexor_core::neuron::{Neuron, NeuronImpl};
67    /// # use plexor_core::namespace::NamespaceImpl;
68    /// # use plexor_core::codec::{Codec, CodecError, CodecName};
69    /// #
70    /// # #[derive(Debug, Clone, PartialEq)]
71    /// # struct Dummy(u32);
72    /// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
73    /// # impl Codec<Dummy> for Dummy {
74    /// #     fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
75    /// #     fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy(0)) }
76    /// # }
77    /// #
78    /// # let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
79    /// # let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
80    /// # use uuid::Uuid;
81    /// #
82    /// let correlation_id = Uuid::now_v7();
83    /// let payload = Payload::with_correlation(Dummy(42), neuron, Some(correlation_id));
84    /// assert_eq!(payload.correlation_id(), correlation_id);
85    /// ```
86    pub fn with_correlation(
87        value: T,
88        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
89        correlation_id: Option<Uuid>,
90    ) -> Arc<Self>
91    where
92        T: Send + Sync + 'static,
93        C: Codec<T> + CodecName + Send + Sync + 'static,
94    {
95        let correlation = correlation_id.unwrap_or_else(Uuid::now_v7);
96        // New trace: span_id is lower 64 bits of correlation_id
97        let span_id = correlation.as_u128() as u64;
98
99        Arc::new(Payload::from_parts(
100            Arc::new(value),
101            neuron,
102            TraceContext::from_parts(correlation, span_id, None),
103        ))
104    }
105
106    /// Simplified creation method for new signals.
107    /// automatically generates correlation_id and other tracing fields.
108    ///
109    /// # Examples
110    ///
111    /// ```rust
112    /// # use std::sync::Arc;
113    /// # use plexor_core::payload::Payload;
114    /// # use plexor_core::neuron::{Neuron, NeuronImpl};
115    /// # use plexor_core::namespace::NamespaceImpl;
116    /// # use plexor_core::codec::{Codec, CodecError, CodecName};
117    /// #
118    /// # #[derive(Debug, Clone, PartialEq)]
119    /// # struct Dummy { foo: i32 }
120    /// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
121    /// # impl Codec<Dummy> for Dummy {
122    /// #     fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
123    /// #     fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy { foo: 42 }) }
124    /// # }
125    /// #
126    /// let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
127    /// let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
128    /// let data = Dummy { foo: 123 };
129    ///
130    /// let payload = Payload::new(data.clone(), neuron);
131    /// assert_eq!(*payload.value, data);
132    /// ```
133    pub fn new(value: T, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> Arc<Self>
134    where
135        T: Send + Sync + 'static,
136        C: Codec<T> + CodecName + Send + Sync + 'static,
137    {
138        Self::with_correlation(value, neuron, None)
139    }
140
141    /// Create a builder for constructing a Payload.
142    ///
143    /// # Examples
144    ///
145    /// ```rust
146    /// # use std::sync::Arc;
147    /// # use plexor_core::payload::Payload;
148    /// # use plexor_core::neuron::{Neuron, NeuronImpl};
149    /// # use plexor_core::namespace::NamespaceImpl;
150    /// # use plexor_core::codec::{Codec, CodecError, CodecName};
151    /// #
152    /// # #[derive(Debug, Clone, PartialEq)]
153    /// # struct Dummy(u32);
154    /// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
155    /// # impl Codec<Dummy> for Dummy {
156    /// #     fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
157    /// #     fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy(0)) }
158    /// # }
159    /// #
160    /// # let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
161    /// # let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
162    /// #
163    /// let payload = Payload::builder()
164    ///     .value(Dummy(42))
165    ///     .neuron(neuron)
166    ///     .build()
167    ///     .unwrap();
168    /// assert_eq!(*payload.value, Dummy(42));
169    /// ```
170    pub fn builder() -> PayloadBuilder<T, C> {
171        PayloadBuilder::default()
172    }
173
174    pub fn correlation_id(&self) -> Uuid {
175        self.trace.correlation_id
176    }
177
178    pub fn span_id(&self) -> u64 {
179        self.trace.span_id
180    }
181
182    pub fn parent_id(&self) -> Option<u64> {
183        self.trace.parent_id
184    }
185}
186
187pub struct PayloadBuilder<T, C> {
188    value: Option<T>,
189    correlation_id: Option<Uuid>,
190    neuron: Option<Arc<dyn Neuron<T, C> + Send + Sync>>,
191    span_id: Option<u64>,
192    parent_id: Option<u64>,
193}
194
195impl<T, C> Default for PayloadBuilder<T, C> {
196    fn default() -> Self {
197        Self {
198            value: None,
199            correlation_id: None,
200            neuron: None,
201            span_id: None,
202            parent_id: None,
203        }
204    }
205}
206
207impl<T, C> PayloadBuilder<T, C> {
208    pub fn value(mut self, value: T) -> Self {
209        self.value = Some(value);
210        self
211    }
212
213    pub fn correlation_id(mut self, id: Uuid) -> Self {
214        self.correlation_id = Some(id);
215        self
216    }
217
218    pub fn neuron(mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> Self {
219        self.neuron = Some(neuron);
220        self
221    }
222
223    pub fn span_id(mut self, id: u64) -> Self {
224        self.span_id = Some(id);
225        self
226    }
227
228    pub fn parent_id(mut self, id: u64) -> Self {
229        self.parent_id = Some(id);
230        self
231    }
232
233    pub fn build(self) -> Result<Arc<Payload<T, C>>, String>
234    where
235        T: Send + Sync + 'static,
236        C: Codec<T> + CodecName + Send + Sync + 'static,
237    {
238        let value = self.value.ok_or("Value is required")?;
239        let neuron = self.neuron.ok_or("Neuron is required")?;
240        let correlation = self.correlation_id.unwrap_or_else(Uuid::now_v7);
241
242        let span = self.span_id.unwrap_or_else(|| correlation.as_u128() as u64);
243
244        Ok(Arc::new(Payload {
245            value: Arc::new(value),
246            neuron,
247            trace: TraceContext::from_parts(correlation, span, self.parent_id),
248        }))
249    }
250}
251
252/// A type-erased raw payload containing bytes, its associated neuron, and tracing context.
253///
254/// # Examples
255///
256/// ```rust
257/// # use std::sync::Arc;
258/// # use plexor_core::payload::PayloadRaw;
259/// # use plexor_core::neuron::{Neuron, NeuronImpl};
260/// # use plexor_core::namespace::NamespaceImpl;
261/// # use plexor_core::codec::{Codec, CodecError, CodecName};
262/// #
263/// # #[derive(Debug, Clone, PartialEq)]
264/// # struct Dummy;
265/// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
266/// # impl Codec<Dummy> for Dummy {
267/// #     fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![1, 2, 3]) }
268/// #     fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy) }
269/// # }
270/// #
271/// # let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
272/// # let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
273/// #
274/// let payload = PayloadRaw::new(vec![1, 2, 3], neuron);
275/// assert_eq!(*payload.value, vec![1, 2, 3]);
276/// ```
277#[derive(Debug)]
278pub struct PayloadRaw<T, C> {
279    pub value: Arc<Vec<u8>>,
280    pub neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
281    pub trace: TraceContext,
282}
283
284impl<T, C> PayloadRaw<T, C> {
285    pub fn from_parts(
286        value: Arc<Vec<u8>>,
287        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
288        trace: TraceContext,
289    ) -> Self {
290        Self {
291            value,
292            neuron,
293            trace,
294        }
295    }
296
297    /// Helper to create a new PayloadRaw with default tracing fields (new span_id, no parent_id).
298    /// Used when initiating a new signal where payload is already raw bytes.
299    pub fn with_correlation(
300        value: Vec<u8>,
301        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
302        correlation_id: Option<Uuid>,
303    ) -> Arc<Self>
304    where
305        T: Send + Sync + 'static,
306        C: Codec<T> + CodecName + Send + Sync + 'static,
307    {
308        let correlation = correlation_id.unwrap_or_else(Uuid::now_v7);
309        let span_id = correlation.as_u128() as u64;
310
311        Arc::new(PayloadRaw::from_parts(
312            Arc::new(value),
313            neuron,
314            TraceContext::from_parts(correlation, span_id, None),
315        ))
316    }
317
318    /// # Examples
319    ///
320    /// ```rust
321    /// # use std::sync::Arc;
322    /// # use plexor_core::payload::PayloadRaw;
323    /// # use plexor_core::neuron::{Neuron, NeuronImpl};
324    /// # use plexor_core::namespace::NamespaceImpl;
325    /// # use plexor_core::codec::{Codec, CodecError, CodecName};
326    /// #
327    /// # #[derive(Debug, Clone)]
328    /// # struct Dummy;
329    /// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
330    /// # impl Codec<Dummy> for Dummy {
331    /// #     fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
332    /// #     fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy) }
333    /// # }
334    /// #
335    /// let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
336    /// let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
337    /// let raw_data = vec![1, 2, 3];
338    ///
339    /// let payload = PayloadRaw::new(raw_data.clone(), neuron);
340    /// assert_eq!(*payload.value, raw_data);
341    /// ```
342    pub fn new(value: Vec<u8>, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> Arc<Self>
343    where
344        T: Send + Sync + 'static,
345        C: Codec<T> + CodecName + Send + Sync + 'static,
346    {
347        Self::with_correlation(value, neuron, None)
348    }
349
350    pub fn correlation_id(&self) -> Uuid {
351        self.trace.correlation_id
352    }
353
354    pub fn span_id(&self) -> u64 {
355        self.trace.span_id
356    }
357
358    pub fn parent_id(&self) -> Option<u64> {
359        self.trace.parent_id
360    }
361}