Skip to main content

plexor_core/erasure/
synapse.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! Type-erased versions of synapse traits.
8
9use crate::codec::{Codec, CodecName};
10use crate::erasure::error::{ErasureError, ErasureResult};
11use crate::erasure::neuron::{NeuronErased, erase_neuron};
12use crate::erasure::payload::{
13    PayloadErased, PayloadErasedWrapper, PayloadRawErased, PayloadRawErasedWrapper,
14};
15use crate::erasure::reactant::{ErrorReactantErased, ReactantErased, ReactantRawErased};
16use crate::payload::PayloadRaw;
17use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
18use crate::synapse::{SynapseError, SynapseExternal, SynapseInternal};
19use std::any::{Any, TypeId};
20use std::future::Future;
21use std::marker::PhantomData;
22use std::pin::Pin;
23use std::sync::{Arc, RwLock};
24
25pub trait SynapseInternalErased: Send + Sync + 'static {
26    fn as_any(&self) -> &dyn Any;
27    fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static>;
28    fn neuron_name(&self) -> String;
29    fn codec_name(&self) -> String;
30    fn neuron_schema(&self) -> String;
31    fn payload_type_id(&self) -> TypeId;
32    fn codec_type_id(&self) -> TypeId;
33    fn clone_to_box(&self) -> Box<dyn SynapseInternalErased + Send + Sync + 'static>;
34    fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>;
35    #[allow(clippy::type_complexity)]
36    fn transduce_erased(
37        &self,
38        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
39    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
40    #[allow(clippy::type_complexity)]
41    fn transmit_erased(
42        &self,
43        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
44    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
45    fn react_erased(
46        &mut self,
47        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
48        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
49    );
50}
51
52pub trait SynapseExternalErased: Send + Sync + 'static {
53    fn as_any(&self) -> &dyn Any;
54    fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static>;
55    fn neuron_name(&self) -> String;
56    fn codec_name(&self) -> String;
57    fn neuron_schema(&self) -> String;
58    fn payload_type_id(&self) -> TypeId;
59    fn codec_type_id(&self) -> TypeId;
60    fn clone_to_box(&self) -> Box<dyn SynapseExternalErased + Send + Sync + 'static>;
61    fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>;
62    #[allow(clippy::type_complexity)]
63    fn transduce_erased(
64        &self,
65        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
66    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>;
67    #[allow(clippy::type_complexity)]
68    fn transmit_erased(
69        &self,
70        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
71    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>;
72    fn react_erased(
73        &mut self,
74        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
75        raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
76        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
77    );
78}
79
80/// Wrapper that implements SynapseInternalErased for any concrete SynapseInternal
81pub struct SynapseInternalErasedWrapper<T, C, S> {
82    synapse: Arc<RwLock<S>>,
83    erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
84    _phantom: PhantomData<(T, C)>,
85}
86
87impl<T, C, S> SynapseInternalErasedWrapper<T, C, S>
88where
89    T: Send + Sync + 'static,
90    C: Codec<T> + CodecName + Send + Sync + 'static,
91    S: SynapseInternal<T, C> + Send + Sync + 'static,
92{
93    pub fn new(synapse: S) -> Self {
94        Self {
95            synapse: Arc::new(RwLock::new(synapse)),
96            erased_reactants: Vec::new(),
97            _phantom: PhantomData,
98        }
99    }
100
101    /// Create a type-erased synapse internal from a correctly typed synapse internal
102    pub fn from_typed_synapse(
103        synapse: S,
104    ) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>
105    where
106        T: 'static,
107        C: 'static,
108        S: 'static,
109    {
110        Arc::new(RwLock::new(Self::new(synapse)))
111    }
112
113    /// Safely attempt to downcast an erased synapse to its concrete type.
114    pub fn to_typed_synapse<U, D, R>(&self) -> ErasureResult<Arc<RwLock<R>>>
115    where
116        U: Send + Sync + 'static,
117        D: Codec<U> + CodecName + Send + Sync + 'static,
118        R: SynapseInternal<U, D> + Send + Sync + 'static,
119    {
120        if TypeId::of::<T>() == TypeId::of::<U>() && TypeId::of::<C>() == TypeId::of::<D>() {
121            // Safety: Types are checked by TypeId
122            unsafe {
123                Ok(std::mem::transmute::<Arc<RwLock<S>>, Arc<RwLock<R>>>(
124                    self.synapse.clone(),
125                ))
126            }
127        } else {
128            Err(ErasureError::SynapseTypeMismatch {
129                expected_payload_type: TypeId::of::<U>(),
130                expected_codec_type: TypeId::of::<D>(),
131                actual_payload_type: TypeId::of::<T>(),
132                actual_codec_type: TypeId::of::<C>(),
133            })
134        }
135    }
136}
137
138impl<T, C, S> SynapseInternalErased for SynapseInternalErasedWrapper<T, C, S>
139where
140    T: Send + Sync + 'static,
141    C: Codec<T> + CodecName + Send + Sync + 'static,
142    S: SynapseInternal<T, C> + Send + Sync + 'static,
143{
144    fn as_any(&self) -> &dyn Any {
145        self
146    }
147
148    fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
149        erase_neuron(self.synapse.read().unwrap().neuron())
150    }
151
152    fn transduce_erased(
153        &self,
154        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
155    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
156        // Try to downcast the erased payload to its concrete wrapper safely
157        if let Some(wrapper) = payload
158            .as_any()
159            .downcast_ref::<PayloadErasedWrapper<T, C>>()
160        {
161            let typed_payload = wrapper.get_typed_payload();
162            let synapse_arc = self.synapse.clone();
163
164            // Extract the future before the await point to avoid holding the guard across awaits
165            let future = {
166                let synapse = synapse_arc.read().unwrap();
167                synapse.transduce(typed_payload)
168            };
169
170            Box::pin(future)
171        } else {
172            // If the types don't match, return an error
173            let neuron_name = self.neuron_name();
174            Box::pin(async move { Err(SynapseError::NeuronTypeConversion { neuron_name }) })
175        }
176    }
177
178    fn transmit_erased(
179        &self,
180        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
181    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
182        // Try to downcast the erased payload to its concrete wrapper safely
183        if let Some(wrapper) = payload
184            .as_any()
185            .downcast_ref::<PayloadErasedWrapper<T, C>>()
186        {
187            let typed_payload = wrapper.get_typed_payload();
188            let synapse_arc = self.synapse.clone();
189
190            // Extract the future before the await point to avoid holding the guard across awaits
191            let future = {
192                let synapse = synapse_arc.read().unwrap();
193                synapse.transmit(typed_payload)
194            };
195
196            Box::pin(future)
197        } else {
198            // If the types don't match, return an error
199            let neuron_name = self.neuron_name();
200            Box::pin(async move { Err(SynapseError::NeuronTypeConversion { neuron_name }) })
201        }
202    }
203
204    fn react_erased(
205        &mut self,
206        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
207        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
208    ) {
209        if reactants.is_empty() && error_reactants.is_empty() {
210            return; // No reactants to process
211        }
212
213        let typed_reactants: Vec<_> = reactants
214            .into_iter()
215            .filter_map(|erased_reactant| {
216                // First, check if the types match to avoid unnecessary work.
217                if erased_reactant.payload_type_id() != TypeId::of::<T>()
218                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
219                {
220                    return None;
221                }
222
223                // The types match, so clone into an `Any` Arc.
224                let any_arc = erased_reactant.clone_to_any();
225
226                // Attempt to downcast to the typed Arc we need.
227                any_arc
228                    .downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
229                    .ok()
230                    .map(|boxed_arc| (*boxed_arc).clone())
231            })
232            .collect();
233
234        let typed_error_reactants: Vec<_> = error_reactants
235            .into_iter()
236            .filter_map(|erased_reactant| {
237                if erased_reactant.payload_type_id() != TypeId::of::<T>()
238                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
239                {
240                    return None;
241                }
242                let any_arc = erased_reactant.clone_to_any();
243                any_arc
244                    .downcast::<Arc<dyn ErrorReactant<T, C> + Send + Sync + 'static>>()
245                    .ok()
246                    .map(|boxed_arc| (*boxed_arc).clone())
247            })
248            .collect();
249
250        // Now we can call react on the underlying synapse with mutable access through RwLock
251        if !typed_reactants.is_empty() || !typed_error_reactants.is_empty() {
252            let _ = self
253                .synapse
254                .write()
255                .unwrap()
256                .react(typed_reactants, typed_error_reactants);
257        }
258    }
259
260    fn neuron_name(&self) -> String {
261        self.synapse.read().unwrap().neuron().name()
262    }
263
264    fn codec_name(&self) -> String {
265        C::name().to_string()
266    }
267
268    fn neuron_schema(&self) -> String {
269        self.synapse.read().unwrap().neuron().schema()
270    }
271
272    fn payload_type_id(&self) -> TypeId {
273        TypeId::of::<T>()
274    }
275
276    fn codec_type_id(&self) -> TypeId {
277        TypeId::of::<C>()
278    }
279
280    fn clone_to_box(&self) -> Box<dyn SynapseInternalErased + Send + Sync + 'static> {
281        Box::new(Self {
282            synapse: self.synapse.clone(),
283            erased_reactants: self.erased_reactants.clone(),
284            _phantom: PhantomData,
285        })
286    }
287
288    fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>> {
289        Arc::new(RwLock::new(Self {
290            synapse: self.synapse.clone(),
291            erased_reactants: self.erased_reactants.clone(),
292            _phantom: PhantomData,
293        }))
294    }
295}
296
297/// Wrapper that implements SynapseExternalErased for any concrete SynapseExternal
298pub struct SynapseExternalErasedWrapper<T, C, S> {
299    synapse: Arc<RwLock<S>>,
300    _phantom: PhantomData<(T, C)>,
301}
302
303impl<T, C, S> SynapseExternalErasedWrapper<T, C, S>
304where
305    T: Send + Sync + 'static,
306    C: Codec<T> + CodecName + Send + Sync + 'static,
307    S: SynapseExternal<T, C> + Send + Sync + 'static,
308{
309    pub fn new(synapse: S) -> Self {
310        Self {
311            synapse: Arc::new(RwLock::new(synapse)),
312            _phantom: PhantomData,
313        }
314    }
315
316    /// Create a type-erased external synapse from a correctly typed external synapse
317    pub fn from_typed_synapse(
318        synapse: S,
319    ) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>
320    where
321        T: 'static,
322        C: 'static,
323        S: 'static,
324    {
325        Arc::new(RwLock::new(Self::new(synapse)))
326    }
327
328    /// Safely attempt to downcast an erased external synapse to its concrete type.
329    pub fn to_typed_synapse<U, D, R>(&self) -> ErasureResult<Arc<RwLock<R>>>
330    where
331        U: Send + Sync + 'static,
332        D: Codec<U> + CodecName + Send + Sync + 'static,
333        R: SynapseExternal<U, D> + Send + Sync + 'static,
334    {
335        if TypeId::of::<T>() == TypeId::of::<U>() && TypeId::of::<C>() == TypeId::of::<D>() {
336            // Safety: Types are checked by TypeId
337            unsafe {
338                Ok(std::mem::transmute::<Arc<RwLock<S>>, Arc<RwLock<R>>>(
339                    self.synapse.clone(),
340                ))
341            }
342        } else {
343            Err(ErasureError::SynapseTypeMismatch {
344                expected_payload_type: TypeId::of::<U>(),
345                expected_codec_type: TypeId::of::<D>(),
346                actual_payload_type: TypeId::of::<T>(),
347                actual_codec_type: TypeId::of::<C>(),
348            })
349        }
350    }
351}
352
353impl<T, C, S> SynapseExternalErased for SynapseExternalErasedWrapper<T, C, S>
354where
355    T: Send + Sync + 'static,
356    C: Codec<T> + CodecName + Send + Sync + 'static,
357    S: SynapseExternal<T, C> + Send + Sync + 'static,
358{
359    fn as_any(&self) -> &dyn Any {
360        self
361    }
362
363    fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
364        erase_neuron(self.synapse.read().unwrap().neuron())
365    }
366
367    fn transduce_erased(
368        &self,
369        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
370    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>
371    {
372        // Try to downcast the erased payload to its concrete wrapper safely
373        if let Some(wrapper) = payload
374            .as_any()
375            .downcast_ref::<PayloadRawErasedWrapper<T, C>>()
376        {
377            let typed_payload = wrapper.get_payload_raw();
378            let synapse_arc = self.synapse.clone();
379
380            // Extract the future before the await point to avoid holding the guard across awaits
381            let future = {
382                let synapse = synapse_arc.read().unwrap();
383                synapse.transduce(typed_payload)
384            };
385
386            Box::pin(future)
387        } else {
388            // It might be a SimplePayloadRawErased or another implementation.
389            // We create a typed PayloadRaw using the synapse's neuron.
390            let synapse_arc = self.synapse.clone();
391            let bytes = payload.get_bytes();
392            let trace = payload.get_trace_context();
393
394            // Acquire lock to get neuron and create the future
395            let future_res = {
396                let synapse = synapse_arc.read().unwrap();
397                let typed_payload = Arc::new(PayloadRaw::from_parts(
398                    bytes,
399                    Some(synapse.neuron().clone()),
400                    trace,
401                ));
402                synapse.transduce(typed_payload)
403            };
404
405            Box::pin(future_res)
406        }
407    }
408
409    fn transmit_erased(
410        &self,
411        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
412    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>
413    {
414        // Try to downcast the erased payload to its concrete wrapper safely
415        if let Some(wrapper) = payload
416            .as_any()
417            .downcast_ref::<PayloadRawErasedWrapper<T, C>>()
418        {
419            let typed_payload = wrapper.get_payload_raw();
420            let synapse_arc = self.synapse.clone();
421
422            // Extract the future before the await point to avoid holding the guard across awaits
423            let future = {
424                let synapse = synapse_arc.read().unwrap();
425                synapse.transmit(typed_payload)
426            };
427
428            Box::pin(future)
429        } else {
430            // It might be a SimplePayloadRawErased or another implementation.
431            // We create a typed PayloadRaw using the synapse's neuron.
432            let synapse_arc = self.synapse.clone();
433            let bytes = payload.get_bytes();
434            let trace = payload.get_trace_context();
435
436            // Acquire lock to get neuron and create the future
437            let future_res = {
438                let synapse = synapse_arc.read().unwrap();
439                let typed_payload = Arc::new(PayloadRaw::from_parts(
440                    bytes,
441                    Some(synapse.neuron().clone()),
442                    trace,
443                ));
444                synapse.transmit(typed_payload)
445            };
446
447            Box::pin(future_res)
448        }
449    }
450
451    fn react_erased(
452        &mut self,
453        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
454        raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
455        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
456    ) {
457        if reactants.is_empty() && raw_reactants.is_empty() && error_reactants.is_empty() {
458            return; // No reactants to process
459        }
460
461        let typed_reactants: Vec<_> = reactants
462            .into_iter()
463            .filter_map(|erased_reactant| {
464                // First, check if the types match to avoid unnecessary work.
465                if erased_reactant.payload_type_id() != TypeId::of::<T>()
466                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
467                {
468                    return None;
469                }
470
471                // The types match, so clone into an `Any` Arc.
472                let any_arc = erased_reactant.clone_to_any();
473
474                // Attempt to downcast to the typed Arc we need.
475                any_arc
476                    .downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
477                    .ok()
478                    .map(|boxed_arc| (*boxed_arc).clone())
479            })
480            .collect();
481
482        let typed_raw_reactants: Vec<_> = raw_reactants
483            .into_iter()
484            .filter_map(|erased_reactant| {
485                // First, check if the types match to avoid unnecessary work.
486                if erased_reactant.payload_type_id() != TypeId::of::<T>()
487                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
488                {
489                    return None;
490                }
491
492                let any_arc = erased_reactant.clone_to_any();
493                any_arc
494                    .downcast::<Arc<dyn ReactantRaw<T, C> + Send + Sync + 'static>>()
495                    .ok()
496                    .map(|boxed_arc| (*boxed_arc).clone())
497            })
498            .collect();
499
500        let typed_error_reactants: Vec<_> = error_reactants
501            .into_iter()
502            .filter_map(|erased_reactant| {
503                if erased_reactant.payload_type_id() != TypeId::of::<T>()
504                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
505                {
506                    return None;
507                }
508                let any_arc = erased_reactant.clone_to_any();
509                any_arc
510                    .downcast::<Arc<dyn ErrorReactant<T, C> + Send + Sync + 'static>>()
511                    .ok()
512                    .map(|boxed_arc| (*boxed_arc).clone())
513            })
514            .collect();
515
516        // Now we can call react on the underlying synapse with mutable access through RwLock
517        if !typed_reactants.is_empty()
518            || !typed_raw_reactants.is_empty()
519            || !typed_error_reactants.is_empty()
520        {
521            let _ = self
522                .synapse
523                .write()
524                .unwrap()
525                .react(typed_reactants, typed_raw_reactants, typed_error_reactants);
526        }
527    }
528
529    fn neuron_name(&self) -> String {
530        self.synapse.read().unwrap().neuron().name()
531    }
532
533    fn codec_name(&self) -> String {
534        C::name().to_string()
535    }
536
537    fn neuron_schema(&self) -> String {
538        self.synapse.read().unwrap().neuron().schema()
539    }
540
541    fn payload_type_id(&self) -> TypeId {
542        TypeId::of::<T>()
543    }
544
545    fn codec_type_id(&self) -> TypeId {
546        TypeId::of::<C>()
547    }
548
549    fn clone_to_box(&self) -> Box<dyn SynapseExternalErased + Send + Sync + 'static> {
550        Box::new(Self {
551            synapse: self.synapse.clone(),
552            _phantom: PhantomData,
553        })
554    }
555
556    fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>> {
557        Arc::new(RwLock::new(Self {
558            synapse: self.synapse.clone(),
559            _phantom: PhantomData,
560        }))
561    }
562}
563
564/// Convenience function to erase a concrete internal synapse
565pub fn erase_synapse_internal<T, C, S>(
566    synapse: S,
567) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>
568where
569    T: Send + Sync + 'static,
570    C: Codec<T> + CodecName + Send + Sync + 'static,
571    S: SynapseInternal<T, C> + Send + Sync + 'static,
572{
573    SynapseInternalErasedWrapper::from_typed_synapse(synapse)
574}
575
576/// Convenience function to erase a concrete external synapse
577pub fn erase_synapse_external<T, C, S>(
578    synapse: S,
579) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>
580where
581    T: Send + Sync + 'static,
582    C: Codec<T> + CodecName + Send + Sync + 'static,
583    S: SynapseExternal<T, C> + Send + Sync + 'static,
584{
585    SynapseExternalErasedWrapper::from_typed_synapse(synapse)
586}