1use crate::codec::{Codec, CodecName};
10use crate::erasure::error::{ErasureError, ErasureResult};
11use crate::erasure::neuron::{NeuronErased, erase_neuron};
12use crate::erasure::payload::{
13 PayloadErased, PayloadErasedWrapper, PayloadRawErased, PayloadRawErasedWrapper,
14};
15use crate::erasure::reactant::{ErrorReactantErased, ReactantErased, ReactantRawErased};
16use crate::payload::PayloadRaw;
17use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
18use crate::synapse::{SynapseError, SynapseExternal, SynapseInternal};
19use std::any::{Any, TypeId};
20use std::future::Future;
21use std::marker::PhantomData;
22use std::pin::Pin;
23use std::sync::{Arc, RwLock};
24
25pub trait SynapseInternalErased: Send + Sync + 'static {
26 fn as_any(&self) -> &dyn Any;
27 fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static>;
28 fn neuron_name(&self) -> String;
29 fn codec_name(&self) -> String;
30 fn neuron_schema(&self) -> String;
31 fn payload_type_id(&self) -> TypeId;
32 fn codec_type_id(&self) -> TypeId;
33 fn clone_to_box(&self) -> Box<dyn SynapseInternalErased + Send + Sync + 'static>;
34 fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>;
35 #[allow(clippy::type_complexity)]
36 fn transduce_erased(
37 &self,
38 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
39 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
40 #[allow(clippy::type_complexity)]
41 fn transmit_erased(
42 &self,
43 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
44 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
45 fn react_erased(
46 &mut self,
47 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
48 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
49 );
50}
51
52pub trait SynapseExternalErased: Send + Sync + 'static {
53 fn as_any(&self) -> &dyn Any;
54 fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static>;
55 fn neuron_name(&self) -> String;
56 fn codec_name(&self) -> String;
57 fn neuron_schema(&self) -> String;
58 fn payload_type_id(&self) -> TypeId;
59 fn codec_type_id(&self) -> TypeId;
60 fn clone_to_box(&self) -> Box<dyn SynapseExternalErased + Send + Sync + 'static>;
61 fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>;
62 #[allow(clippy::type_complexity)]
63 fn transduce_erased(
64 &self,
65 payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
66 ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>;
67 #[allow(clippy::type_complexity)]
68 fn transmit_erased(
69 &self,
70 payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
71 ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>;
72 fn react_erased(
73 &mut self,
74 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
75 raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
76 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
77 );
78}
79
80pub struct SynapseInternalErasedWrapper<T, C, S> {
82 synapse: Arc<RwLock<S>>,
83 erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
84 _phantom: PhantomData<(T, C)>,
85}
86
87impl<T, C, S> SynapseInternalErasedWrapper<T, C, S>
88where
89 T: Send + Sync + 'static,
90 C: Codec<T> + CodecName + Send + Sync + 'static,
91 S: SynapseInternal<T, C> + Send + Sync + 'static,
92{
93 pub fn new(synapse: S) -> Self {
94 Self {
95 synapse: Arc::new(RwLock::new(synapse)),
96 erased_reactants: Vec::new(),
97 _phantom: PhantomData,
98 }
99 }
100
101 pub fn from_typed_synapse(
103 synapse: S,
104 ) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>
105 where
106 T: 'static,
107 C: 'static,
108 S: 'static,
109 {
110 Arc::new(RwLock::new(Self::new(synapse)))
111 }
112
113 pub fn to_typed_synapse<U, D, R>(&self) -> ErasureResult<Arc<RwLock<R>>>
115 where
116 U: Send + Sync + 'static,
117 D: Codec<U> + CodecName + Send + Sync + 'static,
118 R: SynapseInternal<U, D> + Send + Sync + 'static,
119 {
120 if TypeId::of::<T>() == TypeId::of::<U>() && TypeId::of::<C>() == TypeId::of::<D>() {
121 unsafe {
123 Ok(std::mem::transmute::<Arc<RwLock<S>>, Arc<RwLock<R>>>(
124 self.synapse.clone(),
125 ))
126 }
127 } else {
128 Err(ErasureError::SynapseTypeMismatch {
129 expected_payload_type: TypeId::of::<U>(),
130 expected_codec_type: TypeId::of::<D>(),
131 actual_payload_type: TypeId::of::<T>(),
132 actual_codec_type: TypeId::of::<C>(),
133 })
134 }
135 }
136}
137
138impl<T, C, S> SynapseInternalErased for SynapseInternalErasedWrapper<T, C, S>
139where
140 T: Send + Sync + 'static,
141 C: Codec<T> + CodecName + Send + Sync + 'static,
142 S: SynapseInternal<T, C> + Send + Sync + 'static,
143{
144 fn as_any(&self) -> &dyn Any {
145 self
146 }
147
148 fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
149 erase_neuron(self.synapse.read().unwrap().neuron())
150 }
151
152 fn transduce_erased(
153 &self,
154 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
155 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
156 if let Some(wrapper) = payload
158 .as_any()
159 .downcast_ref::<PayloadErasedWrapper<T, C>>()
160 {
161 let typed_payload = wrapper.get_typed_payload();
162 let synapse_arc = self.synapse.clone();
163
164 let future = {
166 let synapse = synapse_arc.read().unwrap();
167 synapse.transduce(typed_payload)
168 };
169
170 Box::pin(future)
171 } else {
172 let neuron_name = self.neuron_name();
174 Box::pin(async move { Err(SynapseError::NeuronTypeConversion { neuron_name }) })
175 }
176 }
177
178 fn transmit_erased(
179 &self,
180 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
181 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
182 if let Some(wrapper) = payload
184 .as_any()
185 .downcast_ref::<PayloadErasedWrapper<T, C>>()
186 {
187 let typed_payload = wrapper.get_typed_payload();
188 let synapse_arc = self.synapse.clone();
189
190 let future = {
192 let synapse = synapse_arc.read().unwrap();
193 synapse.transmit(typed_payload)
194 };
195
196 Box::pin(future)
197 } else {
198 let neuron_name = self.neuron_name();
200 Box::pin(async move { Err(SynapseError::NeuronTypeConversion { neuron_name }) })
201 }
202 }
203
204 fn react_erased(
205 &mut self,
206 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
207 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
208 ) {
209 if reactants.is_empty() && error_reactants.is_empty() {
210 return; }
212
213 let typed_reactants: Vec<_> = reactants
214 .into_iter()
215 .filter_map(|erased_reactant| {
216 if erased_reactant.payload_type_id() != TypeId::of::<T>()
218 || erased_reactant.codec_type_id() != TypeId::of::<C>()
219 {
220 return None;
221 }
222
223 let any_arc = erased_reactant.clone_to_any();
225
226 any_arc
228 .downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
229 .ok()
230 .map(|boxed_arc| (*boxed_arc).clone())
231 })
232 .collect();
233
234 let typed_error_reactants: Vec<_> = error_reactants
235 .into_iter()
236 .filter_map(|erased_reactant| {
237 if erased_reactant.payload_type_id() != TypeId::of::<T>()
238 || erased_reactant.codec_type_id() != TypeId::of::<C>()
239 {
240 return None;
241 }
242 let any_arc = erased_reactant.clone_to_any();
243 any_arc
244 .downcast::<Arc<dyn ErrorReactant<T, C> + Send + Sync + 'static>>()
245 .ok()
246 .map(|boxed_arc| (*boxed_arc).clone())
247 })
248 .collect();
249
250 if !typed_reactants.is_empty() || !typed_error_reactants.is_empty() {
252 let _ = self
253 .synapse
254 .write()
255 .unwrap()
256 .react(typed_reactants, typed_error_reactants);
257 }
258 }
259
260 fn neuron_name(&self) -> String {
261 self.synapse.read().unwrap().neuron().name()
262 }
263
264 fn codec_name(&self) -> String {
265 C::name().to_string()
266 }
267
268 fn neuron_schema(&self) -> String {
269 self.synapse.read().unwrap().neuron().schema()
270 }
271
272 fn payload_type_id(&self) -> TypeId {
273 TypeId::of::<T>()
274 }
275
276 fn codec_type_id(&self) -> TypeId {
277 TypeId::of::<C>()
278 }
279
280 fn clone_to_box(&self) -> Box<dyn SynapseInternalErased + Send + Sync + 'static> {
281 Box::new(Self {
282 synapse: self.synapse.clone(),
283 erased_reactants: self.erased_reactants.clone(),
284 _phantom: PhantomData,
285 })
286 }
287
288 fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>> {
289 Arc::new(RwLock::new(Self {
290 synapse: self.synapse.clone(),
291 erased_reactants: self.erased_reactants.clone(),
292 _phantom: PhantomData,
293 }))
294 }
295}
296
297pub struct SynapseExternalErasedWrapper<T, C, S> {
299 synapse: Arc<RwLock<S>>,
300 _phantom: PhantomData<(T, C)>,
301}
302
303impl<T, C, S> SynapseExternalErasedWrapper<T, C, S>
304where
305 T: Send + Sync + 'static,
306 C: Codec<T> + CodecName + Send + Sync + 'static,
307 S: SynapseExternal<T, C> + Send + Sync + 'static,
308{
309 pub fn new(synapse: S) -> Self {
310 Self {
311 synapse: Arc::new(RwLock::new(synapse)),
312 _phantom: PhantomData,
313 }
314 }
315
316 pub fn from_typed_synapse(
318 synapse: S,
319 ) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>
320 where
321 T: 'static,
322 C: 'static,
323 S: 'static,
324 {
325 Arc::new(RwLock::new(Self::new(synapse)))
326 }
327
328 pub fn to_typed_synapse<U, D, R>(&self) -> ErasureResult<Arc<RwLock<R>>>
330 where
331 U: Send + Sync + 'static,
332 D: Codec<U> + CodecName + Send + Sync + 'static,
333 R: SynapseExternal<U, D> + Send + Sync + 'static,
334 {
335 if TypeId::of::<T>() == TypeId::of::<U>() && TypeId::of::<C>() == TypeId::of::<D>() {
336 unsafe {
338 Ok(std::mem::transmute::<Arc<RwLock<S>>, Arc<RwLock<R>>>(
339 self.synapse.clone(),
340 ))
341 }
342 } else {
343 Err(ErasureError::SynapseTypeMismatch {
344 expected_payload_type: TypeId::of::<U>(),
345 expected_codec_type: TypeId::of::<D>(),
346 actual_payload_type: TypeId::of::<T>(),
347 actual_codec_type: TypeId::of::<C>(),
348 })
349 }
350 }
351}
352
353impl<T, C, S> SynapseExternalErased for SynapseExternalErasedWrapper<T, C, S>
354where
355 T: Send + Sync + 'static,
356 C: Codec<T> + CodecName + Send + Sync + 'static,
357 S: SynapseExternal<T, C> + Send + Sync + 'static,
358{
359 fn as_any(&self) -> &dyn Any {
360 self
361 }
362
363 fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
364 erase_neuron(self.synapse.read().unwrap().neuron())
365 }
366
367 fn transduce_erased(
368 &self,
369 payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
370 ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>
371 {
372 if let Some(wrapper) = payload
374 .as_any()
375 .downcast_ref::<PayloadRawErasedWrapper<T, C>>()
376 {
377 let typed_payload = wrapper.get_payload_raw();
378 let synapse_arc = self.synapse.clone();
379
380 let future = {
382 let synapse = synapse_arc.read().unwrap();
383 synapse.transduce(typed_payload)
384 };
385
386 Box::pin(future)
387 } else {
388 let synapse_arc = self.synapse.clone();
391 let bytes = payload.get_bytes();
392 let trace = payload.get_trace_context();
393
394 let future_res = {
396 let synapse = synapse_arc.read().unwrap();
397 let typed_payload = Arc::new(PayloadRaw::from_parts(
398 bytes,
399 Some(synapse.neuron().clone()),
400 trace,
401 ));
402 synapse.transduce(typed_payload)
403 };
404
405 Box::pin(future_res)
406 }
407 }
408
409 fn transmit_erased(
410 &self,
411 payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
412 ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>
413 {
414 if let Some(wrapper) = payload
416 .as_any()
417 .downcast_ref::<PayloadRawErasedWrapper<T, C>>()
418 {
419 let typed_payload = wrapper.get_payload_raw();
420 let synapse_arc = self.synapse.clone();
421
422 let future = {
424 let synapse = synapse_arc.read().unwrap();
425 synapse.transmit(typed_payload)
426 };
427
428 Box::pin(future)
429 } else {
430 let synapse_arc = self.synapse.clone();
433 let bytes = payload.get_bytes();
434 let trace = payload.get_trace_context();
435
436 let future_res = {
438 let synapse = synapse_arc.read().unwrap();
439 let typed_payload = Arc::new(PayloadRaw::from_parts(
440 bytes,
441 Some(synapse.neuron().clone()),
442 trace,
443 ));
444 synapse.transmit(typed_payload)
445 };
446
447 Box::pin(future_res)
448 }
449 }
450
451 fn react_erased(
452 &mut self,
453 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
454 raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
455 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
456 ) {
457 if reactants.is_empty() && raw_reactants.is_empty() && error_reactants.is_empty() {
458 return; }
460
461 let typed_reactants: Vec<_> = reactants
462 .into_iter()
463 .filter_map(|erased_reactant| {
464 if erased_reactant.payload_type_id() != TypeId::of::<T>()
466 || erased_reactant.codec_type_id() != TypeId::of::<C>()
467 {
468 return None;
469 }
470
471 let any_arc = erased_reactant.clone_to_any();
473
474 any_arc
476 .downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
477 .ok()
478 .map(|boxed_arc| (*boxed_arc).clone())
479 })
480 .collect();
481
482 let typed_raw_reactants: Vec<_> = raw_reactants
483 .into_iter()
484 .filter_map(|erased_reactant| {
485 if erased_reactant.payload_type_id() != TypeId::of::<T>()
487 || erased_reactant.codec_type_id() != TypeId::of::<C>()
488 {
489 return None;
490 }
491
492 let any_arc = erased_reactant.clone_to_any();
493 any_arc
494 .downcast::<Arc<dyn ReactantRaw<T, C> + Send + Sync + 'static>>()
495 .ok()
496 .map(|boxed_arc| (*boxed_arc).clone())
497 })
498 .collect();
499
500 let typed_error_reactants: Vec<_> = error_reactants
501 .into_iter()
502 .filter_map(|erased_reactant| {
503 if erased_reactant.payload_type_id() != TypeId::of::<T>()
504 || erased_reactant.codec_type_id() != TypeId::of::<C>()
505 {
506 return None;
507 }
508 let any_arc = erased_reactant.clone_to_any();
509 any_arc
510 .downcast::<Arc<dyn ErrorReactant<T, C> + Send + Sync + 'static>>()
511 .ok()
512 .map(|boxed_arc| (*boxed_arc).clone())
513 })
514 .collect();
515
516 if !typed_reactants.is_empty()
518 || !typed_raw_reactants.is_empty()
519 || !typed_error_reactants.is_empty()
520 {
521 let _ = self
522 .synapse
523 .write()
524 .unwrap()
525 .react(typed_reactants, typed_raw_reactants, typed_error_reactants);
526 }
527 }
528
529 fn neuron_name(&self) -> String {
530 self.synapse.read().unwrap().neuron().name()
531 }
532
533 fn codec_name(&self) -> String {
534 C::name().to_string()
535 }
536
537 fn neuron_schema(&self) -> String {
538 self.synapse.read().unwrap().neuron().schema()
539 }
540
541 fn payload_type_id(&self) -> TypeId {
542 TypeId::of::<T>()
543 }
544
545 fn codec_type_id(&self) -> TypeId {
546 TypeId::of::<C>()
547 }
548
549 fn clone_to_box(&self) -> Box<dyn SynapseExternalErased + Send + Sync + 'static> {
550 Box::new(Self {
551 synapse: self.synapse.clone(),
552 _phantom: PhantomData,
553 })
554 }
555
556 fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>> {
557 Arc::new(RwLock::new(Self {
558 synapse: self.synapse.clone(),
559 _phantom: PhantomData,
560 }))
561 }
562}
563
564pub fn erase_synapse_internal<T, C, S>(
566 synapse: S,
567) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>
568where
569 T: Send + Sync + 'static,
570 C: Codec<T> + CodecName + Send + Sync + 'static,
571 S: SynapseInternal<T, C> + Send + Sync + 'static,
572{
573 SynapseInternalErasedWrapper::from_typed_synapse(synapse)
574}
575
576pub fn erase_synapse_external<T, C, S>(
578 synapse: S,
579) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>
580where
581 T: Send + Sync + 'static,
582 C: Codec<T> + CodecName + Send + Sync + 'static,
583 S: SynapseExternal<T, C> + Send + Sync + 'static,
584{
585 SynapseExternalErasedWrapper::from_typed_synapse(synapse)
586}