Skip to main content

plexor_core/erasure/
payload.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! Type-erased versions of payload traits.
8
9use crate::codec::{Codec, CodecName};
10use crate::erasure::error::{ErasureError, ErasureResult};
11use crate::erasure::neuron::{NeuronErased, NeuronErasedWrapper};
12use crate::logging::{LogTrace, TraceContext};
13use crate::payload::{Payload, PayloadRaw};
14use std::any::{Any, TypeId};
15use std::marker::PhantomData;
16use std::sync::Arc;
17use uuid::Uuid;
18
19/// Type-erased payload that can be stored in collections with other type-erased payloads
20pub trait PayloadErased: LogTrace + Send + Sync + 'static {
21    fn as_any(&self) -> &dyn Any;
22    fn get_trace_context(&self) -> TraceContext;
23    fn get_correlation_id(&self) -> Uuid {
24        self.get_trace_context().correlation_id
25    }
26    fn get_neuron_name(&self) -> String;
27    fn get_span_id(&self) -> u64 {
28        self.get_trace_context().span_id
29    }
30    fn get_parent_id(&self) -> Option<u64> {
31        self.get_trace_context().parent_id
32    }
33    fn payload_type_id(&self) -> TypeId;
34    fn codec_type_id(&self) -> TypeId;
35    fn clone_to_box(&self) -> Box<dyn PayloadErased + Send + Sync + 'static>;
36    fn clone_to_arc(&self) -> Arc<dyn PayloadErased + Send + Sync + 'static>;
37    fn get_erased_neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static>;
38    fn get_value(&self) -> &dyn std::any::Any;
39}
40
41/// Type-erased payload raw that can be stored in collections with other type-erased payload raws
42pub trait PayloadRawErased: LogTrace + Send + Sync + 'static {
43    fn as_any(&self) -> &dyn Any;
44    fn get_bytes(&self) -> Arc<Vec<u8>>;
45    fn get_trace_context(&self) -> TraceContext;
46    fn get_correlation_id(&self) -> Uuid {
47        self.get_trace_context().correlation_id
48    }
49    fn get_neuron_name(&self) -> String;
50    fn get_span_id(&self) -> u64 {
51        self.get_trace_context().span_id
52    }
53    fn get_parent_id(&self) -> Option<u64> {
54        self.get_trace_context().parent_id
55    }
56    fn get_erased_neuron(&self) -> Option<Arc<dyn NeuronErased + Send + Sync + 'static>>;
57    fn payload_type_id(&self) -> TypeId;
58    fn codec_type_id(&self) -> TypeId;
59    fn clone_to_box(&self) -> Box<dyn PayloadRawErased + Send + Sync + 'static>;
60    fn clone_to_arc(&self) -> Arc<dyn PayloadRawErased + Send + Sync + 'static>;
61}
62
63/// A simple implementation of PayloadRawErased that doesn't require type parameters.
64/// Useful for transport-level handling before the specific types are known.
65#[derive(Debug)]
66pub struct SimplePayloadRawErased {
67    pub bytes: Arc<Vec<u8>>,
68    pub neuron_name: String,
69    pub trace: TraceContext,
70    pub payload_type_id: TypeId,
71    pub codec_type_id: TypeId,
72}
73
74impl SimplePayloadRawErased {
75    pub fn new(
76        bytes: Arc<Vec<u8>>,
77        neuron_name: String,
78        trace: TraceContext,
79        payload_type_id: TypeId,
80        codec_type_id: TypeId,
81    ) -> Self {
82        Self {
83            bytes,
84            neuron_name,
85            trace,
86            payload_type_id,
87            codec_type_id,
88        }
89    }
90}
91
92impl PayloadRawErased for SimplePayloadRawErased {
93    fn as_any(&self) -> &dyn Any {
94        self
95    }
96    fn get_bytes(&self) -> Arc<Vec<u8>> {
97        self.bytes.clone()
98    }
99    fn get_trace_context(&self) -> TraceContext {
100        self.trace
101    }
102    fn get_neuron_name(&self) -> String {
103        self.neuron_name.clone()
104    }
105    fn get_erased_neuron(&self) -> Option<Arc<dyn NeuronErased + Send + Sync + 'static>> {
106        None
107    }
108    fn payload_type_id(&self) -> TypeId {
109        self.payload_type_id
110    }
111    fn codec_type_id(&self) -> TypeId {
112        self.codec_type_id
113    }
114    fn clone_to_box(&self) -> Box<dyn PayloadRawErased + Send + Sync + 'static> {
115        Box::new(Self {
116            bytes: self.bytes.clone(),
117            neuron_name: self.neuron_name.clone(),
118            trace: self.trace,
119            payload_type_id: self.payload_type_id,
120            codec_type_id: self.codec_type_id,
121        })
122    }
123    fn clone_to_arc(&self) -> Arc<dyn PayloadRawErased + Send + Sync + 'static> {
124        Arc::new(Self {
125            bytes: self.bytes.clone(),
126            neuron_name: self.neuron_name.clone(),
127            trace: self.trace,
128            payload_type_id: self.payload_type_id,
129            codec_type_id: self.codec_type_id,
130        })
131    }
132}
133
134/// Wrapper that implements PayloadErased for any concrete Payload
135#[derive(Debug)]
136pub struct PayloadErasedWrapper<T: 'static, C: 'static> {
137    payload: Arc<Payload<T, C>>,
138    _phantom: PhantomData<(T, C)>,
139}
140
141impl<T, C> PayloadErasedWrapper<T, C>
142where
143    T: Send + Sync + 'static,
144    C: Codec<T> + CodecName + Send + Sync + 'static,
145{
146    pub fn new(payload: Arc<Payload<T, C>>) -> Self {
147        Self {
148            payload,
149            _phantom: PhantomData,
150        }
151    }
152
153    /// Create a type-erased payload from a correctly typed payload
154    pub fn from_typed_payload(
155        payload: Arc<Payload<T, C>>,
156    ) -> Arc<dyn PayloadErased + Send + Sync + 'static> {
157        Arc::new(Self::new(payload))
158    }
159
160    /// Get the underlying typed payload
161    pub fn get_typed_payload(&self) -> Arc<Payload<T, C>> {
162        self.payload.clone()
163    }
164
165    /// Get the neuron as a type-erased neuron
166    pub fn get_erased_neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
167        NeuronErasedWrapper::from_typed_neuron(self.payload.neuron.clone())
168    }
169
170    /// Convert this wrapper to a correctly typed payload with different type parameters
171    /// Returns an error if the type parameters don't match
172    pub fn to_typed_payload<U, D>(&self) -> ErasureResult<Arc<Payload<U, D>>>
173    where
174        U: Send + Sync + 'static,
175        D: Send + Sync + 'static,
176    {
177        // Check if the type parameters match using Any downcast (safest)
178        if let Some(wrapper) = (self as &dyn Any).downcast_ref::<PayloadErasedWrapper<U, D>>() {
179            Ok(wrapper.payload.clone())
180        } else {
181            Err(ErasureError::PayloadTypeMismatch {
182                expected_payload_type: TypeId::of::<U>(),
183                expected_codec_type: TypeId::of::<D>(),
184                actual_payload_type: TypeId::of::<T>(),
185                actual_codec_type: TypeId::of::<C>(),
186            })
187        }
188    }
189}
190
191impl<T, C> PayloadErased for PayloadErasedWrapper<T, C>
192where
193    T: Send + Sync + 'static,
194    C: Codec<T> + CodecName + Send + Sync + 'static,
195{
196    fn as_any(&self) -> &dyn Any {
197        self
198    }
199
200    fn get_trace_context(&self) -> TraceContext {
201        self.payload.trace
202    }
203
204    fn get_neuron_name(&self) -> String {
205        self.payload.neuron.name()
206    }
207
208    fn payload_type_id(&self) -> TypeId {
209        TypeId::of::<T>()
210    }
211
212    fn codec_type_id(&self) -> TypeId {
213        TypeId::of::<C>()
214    }
215
216    fn clone_to_box(&self) -> Box<dyn PayloadErased + Send + Sync + 'static> {
217        Box::new(PayloadErasedWrapper {
218            payload: self.payload.clone(),
219            _phantom: PhantomData,
220        })
221    }
222
223    fn clone_to_arc(&self) -> Arc<dyn PayloadErased + Send + Sync + 'static> {
224        Arc::new(PayloadErasedWrapper {
225            payload: self.payload.clone(),
226            _phantom: PhantomData,
227        })
228    }
229
230    fn get_erased_neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
231        NeuronErasedWrapper::from_typed_neuron(self.payload.neuron.clone())
232    }
233
234    fn get_value(&self) -> &dyn std::any::Any {
235        &*self.payload.value as &dyn std::any::Any
236    }
237}
238
239/// Wrapper that implements PayloadRawErased for PayloadRaw
240#[derive(Debug)]
241pub struct PayloadRawErasedWrapper<T: 'static, C: 'static> {
242    payload: Arc<PayloadRaw<T, C>>,
243    _phantom: PhantomData<(T, C)>,
244}
245
246impl<T, C> PayloadRawErasedWrapper<T, C>
247where
248    T: Send + Sync + 'static,
249    C: Codec<T> + CodecName + Send + Sync + 'static,
250{
251    pub fn new(payload: Arc<PayloadRaw<T, C>>) -> Self {
252        Self {
253            payload,
254            _phantom: PhantomData,
255        }
256    }
257
258    /// Create a type-erased payload raw from a payload raw
259    pub fn from_payload_raw(
260        payload: Arc<PayloadRaw<T, C>>,
261    ) -> Arc<dyn PayloadRawErased + Send + Sync + 'static> {
262        Arc::new(Self::new(payload))
263    }
264
265    /// Get the underlying payload raw
266    pub fn get_payload_raw(&self) -> Arc<PayloadRaw<T, C>> {
267        self.payload.clone()
268    }
269
270    /// Convert this wrapper to a correctly typed payload raw
271    pub fn to_typed_payload(&self) -> Arc<PayloadRaw<T, C>> {
272        self.payload.clone()
273    }
274}
275
276impl<T, C> PayloadRawErased for PayloadRawErasedWrapper<T, C>
277where
278    T: Send + Sync + 'static,
279    C: Codec<T> + CodecName + Send + Sync + 'static,
280{
281    fn as_any(&self) -> &dyn Any {
282        self
283    }
284
285    fn get_bytes(&self) -> Arc<Vec<u8>> {
286        self.payload.value.clone()
287    }
288
289    fn get_trace_context(&self) -> TraceContext {
290        self.payload.trace
291    }
292
293    fn get_neuron_name(&self) -> String {
294        match &self.payload.neuron {
295            Some(neuron) => neuron.name(),
296            None => "unknown".to_string(), // Provide a default name when neuron is None
297        }
298    }
299
300    fn get_erased_neuron(&self) -> Option<Arc<dyn NeuronErased + Send + Sync + 'static>> {
301        self.payload
302            .neuron
303            .as_ref()
304            .map(|n| NeuronErasedWrapper::from_typed_neuron(n.clone()))
305    }
306
307    fn payload_type_id(&self) -> TypeId {
308        TypeId::of::<T>()
309    }
310
311    fn codec_type_id(&self) -> TypeId {
312        TypeId::of::<C>()
313    }
314
315    fn clone_to_box(&self) -> Box<dyn PayloadRawErased + Send + Sync + 'static> {
316        Box::new(PayloadRawErasedWrapper {
317            payload: self.payload.clone(),
318            _phantom: PhantomData,
319        })
320    }
321
322    fn clone_to_arc(&self) -> Arc<dyn PayloadRawErased + Send + Sync + 'static> {
323        Arc::new(PayloadRawErasedWrapper {
324            payload: self.payload.clone(),
325            _phantom: PhantomData,
326        })
327    }
328}
329
330/// Convenience function to create a type-erased payload from a correctly typed payload
331pub fn erase_payload<T, C>(
332    payload: Arc<Payload<T, C>>,
333) -> Arc<dyn PayloadErased + Send + Sync + 'static>
334where
335    T: Send + Sync + 'static,
336    C: Codec<T> + CodecName + Send + Sync + 'static,
337{
338    PayloadErasedWrapper::from_typed_payload(payload)
339}
340
341/// Convenience function to create a type-erased payload raw from a payload raw
342pub fn erase_payload_raw<T, C>(
343    payload: Arc<PayloadRaw<T, C>>,
344) -> Arc<dyn PayloadRawErased + Send + Sync + 'static>
345where
346    T: Send + Sync + 'static,
347    C: Codec<T> + CodecName + Send + Sync + 'static,
348{
349    PayloadRawErasedWrapper::from_payload_raw(payload)
350}
351
352/// Convenience function to convert a type-erased payload wrapper back to a correctly typed payload
353pub fn unerase_payload<T, C>(wrapper: &PayloadErasedWrapper<T, C>) -> Arc<Payload<T, C>>
354where
355    T: Send + Sync + 'static,
356    C: Codec<T> + CodecName + Send + Sync + 'static,
357{
358    wrapper.get_typed_payload()
359}
360
361/// Convenience function to convert a type-erased payload raw wrapper back to a payload raw
362pub fn unerase_payload_raw<T, C>(wrapper: &PayloadRawErasedWrapper<T, C>) -> Arc<PayloadRaw<T, C>>
363where
364    T: Send + Sync + 'static,
365    C: Codec<T> + CodecName + Send + Sync + 'static,
366{
367    wrapper.get_payload_raw()
368}