1use crate::codec::{Codec, CodecName};
10use crate::erasure::error::{ErasureError, ErasureResult};
11use crate::erasure::neuron::{NeuronErased, NeuronErasedWrapper};
12use crate::logging::{LogTrace, TraceContext};
13use crate::payload::{Payload, PayloadRaw};
14use std::any::{Any, TypeId};
15use std::marker::PhantomData;
16use std::sync::Arc;
17use uuid::Uuid;
18
19pub trait PayloadErased: LogTrace + Send + Sync + 'static {
21 fn as_any(&self) -> &dyn Any;
22 fn get_trace_context(&self) -> TraceContext;
23 fn get_correlation_id(&self) -> Uuid {
24 self.get_trace_context().correlation_id
25 }
26 fn get_neuron_name(&self) -> String;
27 fn get_span_id(&self) -> u64 {
28 self.get_trace_context().span_id
29 }
30 fn get_parent_id(&self) -> Option<u64> {
31 self.get_trace_context().parent_id
32 }
33 fn payload_type_id(&self) -> TypeId;
34 fn codec_type_id(&self) -> TypeId;
35 fn clone_to_box(&self) -> Box<dyn PayloadErased + Send + Sync + 'static>;
36 fn clone_to_arc(&self) -> Arc<dyn PayloadErased + Send + Sync + 'static>;
37 fn get_erased_neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static>;
38 fn get_value(&self) -> &dyn std::any::Any;
39}
40
41pub trait PayloadRawErased: LogTrace + Send + Sync + 'static {
43 fn as_any(&self) -> &dyn Any;
44 fn get_bytes(&self) -> Arc<Vec<u8>>;
45 fn get_trace_context(&self) -> TraceContext;
46 fn get_correlation_id(&self) -> Uuid {
47 self.get_trace_context().correlation_id
48 }
49 fn get_neuron_name(&self) -> String;
50 fn get_span_id(&self) -> u64 {
51 self.get_trace_context().span_id
52 }
53 fn get_parent_id(&self) -> Option<u64> {
54 self.get_trace_context().parent_id
55 }
56 fn get_erased_neuron(&self) -> Option<Arc<dyn NeuronErased + Send + Sync + 'static>>;
57 fn payload_type_id(&self) -> TypeId;
58 fn codec_type_id(&self) -> TypeId;
59 fn clone_to_box(&self) -> Box<dyn PayloadRawErased + Send + Sync + 'static>;
60 fn clone_to_arc(&self) -> Arc<dyn PayloadRawErased + Send + Sync + 'static>;
61}
62
63#[derive(Debug)]
66pub struct SimplePayloadRawErased {
67 pub bytes: Arc<Vec<u8>>,
68 pub neuron_name: String,
69 pub trace: TraceContext,
70 pub payload_type_id: TypeId,
71 pub codec_type_id: TypeId,
72}
73
74impl SimplePayloadRawErased {
75 pub fn new(
76 bytes: Arc<Vec<u8>>,
77 neuron_name: String,
78 trace: TraceContext,
79 payload_type_id: TypeId,
80 codec_type_id: TypeId,
81 ) -> Self {
82 Self {
83 bytes,
84 neuron_name,
85 trace,
86 payload_type_id,
87 codec_type_id,
88 }
89 }
90}
91
92impl PayloadRawErased for SimplePayloadRawErased {
93 fn as_any(&self) -> &dyn Any {
94 self
95 }
96 fn get_bytes(&self) -> Arc<Vec<u8>> {
97 self.bytes.clone()
98 }
99 fn get_trace_context(&self) -> TraceContext {
100 self.trace
101 }
102 fn get_neuron_name(&self) -> String {
103 self.neuron_name.clone()
104 }
105 fn get_erased_neuron(&self) -> Option<Arc<dyn NeuronErased + Send + Sync + 'static>> {
106 None
107 }
108 fn payload_type_id(&self) -> TypeId {
109 self.payload_type_id
110 }
111 fn codec_type_id(&self) -> TypeId {
112 self.codec_type_id
113 }
114 fn clone_to_box(&self) -> Box<dyn PayloadRawErased + Send + Sync + 'static> {
115 Box::new(Self {
116 bytes: self.bytes.clone(),
117 neuron_name: self.neuron_name.clone(),
118 trace: self.trace,
119 payload_type_id: self.payload_type_id,
120 codec_type_id: self.codec_type_id,
121 })
122 }
123 fn clone_to_arc(&self) -> Arc<dyn PayloadRawErased + Send + Sync + 'static> {
124 Arc::new(Self {
125 bytes: self.bytes.clone(),
126 neuron_name: self.neuron_name.clone(),
127 trace: self.trace,
128 payload_type_id: self.payload_type_id,
129 codec_type_id: self.codec_type_id,
130 })
131 }
132}
133
134#[derive(Debug)]
136pub struct PayloadErasedWrapper<T: 'static, C: 'static> {
137 payload: Arc<Payload<T, C>>,
138 _phantom: PhantomData<(T, C)>,
139}
140
141impl<T, C> PayloadErasedWrapper<T, C>
142where
143 T: Send + Sync + 'static,
144 C: Codec<T> + CodecName + Send + Sync + 'static,
145{
146 pub fn new(payload: Arc<Payload<T, C>>) -> Self {
147 Self {
148 payload,
149 _phantom: PhantomData,
150 }
151 }
152
153 pub fn from_typed_payload(
155 payload: Arc<Payload<T, C>>,
156 ) -> Arc<dyn PayloadErased + Send + Sync + 'static> {
157 Arc::new(Self::new(payload))
158 }
159
160 pub fn get_typed_payload(&self) -> Arc<Payload<T, C>> {
162 self.payload.clone()
163 }
164
165 pub fn get_erased_neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
167 NeuronErasedWrapper::from_typed_neuron(self.payload.neuron.clone())
168 }
169
170 pub fn to_typed_payload<U, D>(&self) -> ErasureResult<Arc<Payload<U, D>>>
173 where
174 U: Send + Sync + 'static,
175 D: Send + Sync + 'static,
176 {
177 if let Some(wrapper) = (self as &dyn Any).downcast_ref::<PayloadErasedWrapper<U, D>>() {
179 Ok(wrapper.payload.clone())
180 } else {
181 Err(ErasureError::PayloadTypeMismatch {
182 expected_payload_type: TypeId::of::<U>(),
183 expected_codec_type: TypeId::of::<D>(),
184 actual_payload_type: TypeId::of::<T>(),
185 actual_codec_type: TypeId::of::<C>(),
186 })
187 }
188 }
189}
190
191impl<T, C> PayloadErased for PayloadErasedWrapper<T, C>
192where
193 T: Send + Sync + 'static,
194 C: Codec<T> + CodecName + Send + Sync + 'static,
195{
196 fn as_any(&self) -> &dyn Any {
197 self
198 }
199
200 fn get_trace_context(&self) -> TraceContext {
201 self.payload.trace
202 }
203
204 fn get_neuron_name(&self) -> String {
205 self.payload.neuron.name()
206 }
207
208 fn payload_type_id(&self) -> TypeId {
209 TypeId::of::<T>()
210 }
211
212 fn codec_type_id(&self) -> TypeId {
213 TypeId::of::<C>()
214 }
215
216 fn clone_to_box(&self) -> Box<dyn PayloadErased + Send + Sync + 'static> {
217 Box::new(PayloadErasedWrapper {
218 payload: self.payload.clone(),
219 _phantom: PhantomData,
220 })
221 }
222
223 fn clone_to_arc(&self) -> Arc<dyn PayloadErased + Send + Sync + 'static> {
224 Arc::new(PayloadErasedWrapper {
225 payload: self.payload.clone(),
226 _phantom: PhantomData,
227 })
228 }
229
230 fn get_erased_neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
231 NeuronErasedWrapper::from_typed_neuron(self.payload.neuron.clone())
232 }
233
234 fn get_value(&self) -> &dyn std::any::Any {
235 &*self.payload.value as &dyn std::any::Any
236 }
237}
238
239#[derive(Debug)]
241pub struct PayloadRawErasedWrapper<T: 'static, C: 'static> {
242 payload: Arc<PayloadRaw<T, C>>,
243 _phantom: PhantomData<(T, C)>,
244}
245
246impl<T, C> PayloadRawErasedWrapper<T, C>
247where
248 T: Send + Sync + 'static,
249 C: Codec<T> + CodecName + Send + Sync + 'static,
250{
251 pub fn new(payload: Arc<PayloadRaw<T, C>>) -> Self {
252 Self {
253 payload,
254 _phantom: PhantomData,
255 }
256 }
257
258 pub fn from_payload_raw(
260 payload: Arc<PayloadRaw<T, C>>,
261 ) -> Arc<dyn PayloadRawErased + Send + Sync + 'static> {
262 Arc::new(Self::new(payload))
263 }
264
265 pub fn get_payload_raw(&self) -> Arc<PayloadRaw<T, C>> {
267 self.payload.clone()
268 }
269
270 pub fn to_typed_payload(&self) -> Arc<PayloadRaw<T, C>> {
272 self.payload.clone()
273 }
274}
275
276impl<T, C> PayloadRawErased for PayloadRawErasedWrapper<T, C>
277where
278 T: Send + Sync + 'static,
279 C: Codec<T> + CodecName + Send + Sync + 'static,
280{
281 fn as_any(&self) -> &dyn Any {
282 self
283 }
284
285 fn get_bytes(&self) -> Arc<Vec<u8>> {
286 self.payload.value.clone()
287 }
288
289 fn get_trace_context(&self) -> TraceContext {
290 self.payload.trace
291 }
292
293 fn get_neuron_name(&self) -> String {
294 match &self.payload.neuron {
295 Some(neuron) => neuron.name(),
296 None => "unknown".to_string(), }
298 }
299
300 fn get_erased_neuron(&self) -> Option<Arc<dyn NeuronErased + Send + Sync + 'static>> {
301 self.payload
302 .neuron
303 .as_ref()
304 .map(|n| NeuronErasedWrapper::from_typed_neuron(n.clone()))
305 }
306
307 fn payload_type_id(&self) -> TypeId {
308 TypeId::of::<T>()
309 }
310
311 fn codec_type_id(&self) -> TypeId {
312 TypeId::of::<C>()
313 }
314
315 fn clone_to_box(&self) -> Box<dyn PayloadRawErased + Send + Sync + 'static> {
316 Box::new(PayloadRawErasedWrapper {
317 payload: self.payload.clone(),
318 _phantom: PhantomData,
319 })
320 }
321
322 fn clone_to_arc(&self) -> Arc<dyn PayloadRawErased + Send + Sync + 'static> {
323 Arc::new(PayloadRawErasedWrapper {
324 payload: self.payload.clone(),
325 _phantom: PhantomData,
326 })
327 }
328}
329
330pub fn erase_payload<T, C>(
332 payload: Arc<Payload<T, C>>,
333) -> Arc<dyn PayloadErased + Send + Sync + 'static>
334where
335 T: Send + Sync + 'static,
336 C: Codec<T> + CodecName + Send + Sync + 'static,
337{
338 PayloadErasedWrapper::from_typed_payload(payload)
339}
340
341pub fn erase_payload_raw<T, C>(
343 payload: Arc<PayloadRaw<T, C>>,
344) -> Arc<dyn PayloadRawErased + Send + Sync + 'static>
345where
346 T: Send + Sync + 'static,
347 C: Codec<T> + CodecName + Send + Sync + 'static,
348{
349 PayloadRawErasedWrapper::from_payload_raw(payload)
350}
351
352pub fn unerase_payload<T, C>(wrapper: &PayloadErasedWrapper<T, C>) -> Arc<Payload<T, C>>
354where
355 T: Send + Sync + 'static,
356 C: Codec<T> + CodecName + Send + Sync + 'static,
357{
358 wrapper.get_typed_payload()
359}
360
361pub fn unerase_payload_raw<T, C>(wrapper: &PayloadRawErasedWrapper<T, C>) -> Arc<PayloadRaw<T, C>>
363where
364 T: Send + Sync + 'static,
365 C: Codec<T> + CodecName + Send + Sync + 'static,
366{
367 wrapper.get_payload_raw()
368}