Skip to main content

plexor_core/erasure/
synapse.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! Type-erased versions of synapse traits.
8
9use crate::codec::{Codec, CodecName};
10use crate::erasure::error::{ErasureError, ErasureResult};
11use crate::erasure::neuron::{NeuronErased, erase_neuron};
12use crate::erasure::payload::{
13    PayloadErased, PayloadErasedWrapper, PayloadRawErased, PayloadRawErasedWrapper,
14};
15use crate::erasure::reactant::{ErrorReactantErased, ReactantErased, ReactantRawErased};
16use crate::payload::PayloadRaw;
17use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
18use crate::synapse::{RawSender, SynapseError, SynapseExternal, SynapseInternal};
19use std::any::{Any, TypeId};
20use std::future::Future;
21use std::marker::PhantomData;
22use std::pin::Pin;
23use std::sync::Arc;
24use parking_lot::RwLock;
25
26pub trait SynapseInternalErased: Send + Sync + 'static {
27    fn as_any(&self) -> &dyn Any;
28    fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static>;
29    fn neuron_name(&self) -> String;
30    fn codec_name(&self) -> String;
31    fn neuron_schema(&self) -> String;
32    fn payload_type_id(&self) -> TypeId;
33    fn codec_type_id(&self) -> TypeId;
34    fn clone_to_box(&self) -> Box<dyn SynapseInternalErased + Send + Sync + 'static>;
35    fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>;
36    #[allow(clippy::type_complexity)]
37    fn transduce_erased(
38        &self,
39        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
40    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
41    #[allow(clippy::type_complexity)]
42    fn transmit_erased(
43        &self,
44        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
45    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
46    fn react_erased(
47        &mut self,
48        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
49        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
50    );
51}
52
53pub trait SynapseExternalErased: Send + Sync + 'static {
54    fn as_any(&self) -> &dyn Any;
55    fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static>;
56    fn neuron_name(&self) -> String;
57    fn codec_name(&self) -> String;
58    fn neuron_schema(&self) -> String;
59    fn payload_type_id(&self) -> TypeId;
60    fn codec_type_id(&self) -> TypeId;
61    fn clone_to_box(&self) -> Box<dyn SynapseExternalErased + Send + Sync + 'static>;
62    fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>;
63    #[allow(clippy::type_complexity)]
64    fn transduce_erased(
65        &self,
66        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
67    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>;
68    #[allow(clippy::type_complexity)]
69    fn transmit_erased(
70        &self,
71        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
72    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>;
73    fn react_erased(
74        &mut self,
75        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
76        raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
77        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
78    );
79}
80
81/// A truly type-erased internal synapse that doesn't rely on generic parameters.
82pub struct SimpleSynapseInternalErased {
83    neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
84    reactants: Arc<RwLock<Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>>>,
85    error_reactants: Arc<RwLock<Vec<Arc<dyn ErrorReactantErased + Send + Sync + 'static>>>>,
86}
87
88impl SimpleSynapseInternalErased {
89    pub fn new(neuron: Arc<dyn NeuronErased + Send + Sync + 'static>) -> Self {
90        Self {
91            neuron,
92            reactants: Arc::new(RwLock::new(Vec::new())),
93            error_reactants: Arc::new(RwLock::new(Vec::new())),
94        }
95    }
96}
97
98impl SynapseInternalErased for SimpleSynapseInternalErased {
99    fn as_any(&self) -> &dyn Any {
100        self
101    }
102
103    fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
104        self.neuron.clone()
105    }
106
107    fn neuron_name(&self) -> String {
108        self.neuron.name()
109    }
110
111    fn codec_name(&self) -> String {
112        "erased".to_string()
113    }
114
115    fn neuron_schema(&self) -> String {
116        self.neuron.schema()
117    }
118
119    fn payload_type_id(&self) -> TypeId {
120        self.neuron.payload_type_id()
121    }
122
123    fn codec_type_id(&self) -> TypeId {
124        self.neuron.codec_type_id()
125    }
126
127    fn clone_to_box(&self) -> Box<dyn SynapseInternalErased + Send + Sync + 'static> {
128        Box::new(Self {
129            neuron: self.neuron.clone(),
130            reactants: self.reactants.clone(),
131            error_reactants: self.error_reactants.clone(),
132        })
133    }
134
135    fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>> {
136        Arc::new(RwLock::new(Self {
137            neuron: self.neuron.clone(),
138            reactants: self.reactants.clone(),
139            error_reactants: self.error_reactants.clone(),
140        }))
141    }
142
143    fn transduce_erased(
144        &self,
145        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
146    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
147        self.transmit_erased(payload)
148    }
149
150    fn transmit_erased(
151        &self,
152        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
153    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
154        let reactants = self.reactants.read().clone();
155        let error_reactants = self.error_reactants.read().clone();
156        
157        Box::pin(async move {
158            let mut futures = Vec::new();
159            for reactant in reactants {
160                futures.push(reactant.react_erased(payload.clone()));
161            }
162            
163            let results = futures_util::future::join_all(futures).await;
164            let mut errors = Vec::new();
165            let mut successes = Vec::new();
166            
167            for res in results {
168                match res {
169                    Ok(_) => successes.push(()),
170                    Err(e) => errors.push(Arc::new(e)),
171                }
172            }
173            
174            if !errors.is_empty() && !error_reactants.is_empty() {
175                let mut err_futures = Vec::new();
176                for err in errors {
177                    for err_reactant in &error_reactants {
178                        err_futures.push(err_reactant.react_error_erased(err.clone(), payload.clone()));
179                    }
180                }
181                futures_util::future::join_all(err_futures).await;
182            }
183            
184            Ok(successes)
185        })
186    }
187
188    fn react_erased(
189        &mut self,
190        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
191        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
192    ) {
193        self.reactants.write().extend(reactants);
194        self.error_reactants.write().extend(error_reactants);
195    }
196}
197
198/// A truly type-erased external synapse.
199pub struct SimpleSynapseExternalErased {
200    neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
201    reactants: Arc<RwLock<Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>>>,
202    raw_reactants: Arc<RwLock<Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>>>,
203    error_reactants: Arc<RwLock<Vec<Arc<dyn ErrorReactantErased + Send + Sync + 'static>>>>,
204    sender: Option<Arc<dyn RawSender>>,
205}
206
207impl SimpleSynapseExternalErased {
208    pub fn new(neuron: Arc<dyn NeuronErased + Send + Sync + 'static>) -> Self {
209        Self {
210            neuron,
211            reactants: Arc::new(RwLock::new(Vec::new())),
212            raw_reactants: Arc::new(RwLock::new(Vec::new())),
213            error_reactants: Arc::new(RwLock::new(Vec::new())),
214            sender: None,
215        }
216    }
217
218    pub fn new_with_sender(
219        neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
220        sender: Arc<dyn RawSender>,
221    ) -> Self {
222        Self {
223            neuron,
224            reactants: Arc::new(RwLock::new(Vec::new())),
225            raw_reactants: Arc::new(RwLock::new(Vec::new())),
226            error_reactants: Arc::new(RwLock::new(Vec::new())),
227            sender: Some(sender),
228        }
229    }
230}
231
232impl SynapseExternalErased for SimpleSynapseExternalErased {
233    fn as_any(&self) -> &dyn Any {
234        self
235    }
236
237    fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
238        self.neuron.clone()
239    }
240
241    fn neuron_name(&self) -> String {
242        self.neuron.name()
243    }
244
245    fn codec_name(&self) -> String {
246        "erased".to_string()
247    }
248
249    fn neuron_schema(&self) -> String {
250        self.neuron.schema()
251    }
252
253    fn payload_type_id(&self) -> TypeId {
254        self.neuron.payload_type_id()
255    }
256
257    fn codec_type_id(&self) -> TypeId {
258        self.neuron.codec_type_id()
259    }
260
261    fn clone_to_box(&self) -> Box<dyn SynapseExternalErased + Send + Sync + 'static> {
262        Box::new(Self {
263            neuron: self.neuron.clone(),
264            reactants: self.reactants.clone(),
265            raw_reactants: self.raw_reactants.clone(),
266            error_reactants: self.error_reactants.clone(),
267            sender: self.sender.clone(),
268        })
269    }
270
271    fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>> {
272        Arc::new(RwLock::new(Self {
273            neuron: self.neuron.clone(),
274            reactants: self.reactants.clone(),
275            raw_reactants: self.raw_reactants.clone(),
276            error_reactants: self.error_reactants.clone(),
277            sender: self.sender.clone(),
278        }))
279    }
280
281    fn transduce_erased(
282        &self,
283        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
284    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>
285    {
286        let raw_reactants = self.raw_reactants.read().clone();
287        
288        Box::pin(async move {
289            // Trigger raw reactants
290            let mut raw_futures = Vec::new();
291            for r in raw_reactants {
292                raw_futures.push(r.react_erased(payload.clone()));
293            }
294            
295            let results = futures_util::future::join_all(raw_futures).await;
296            let mut successes = Vec::new();
297            for res in results {
298                if res.is_ok() {
299                    successes.push(());
300                }
301            }
302            
303            Ok((vec![], successes))
304        })
305    }
306
307    fn transmit_erased(
308        &self,
309        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
310    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>
311    {
312        let sender = self.sender.clone();
313        let topic = self.neuron.name();
314        
315        Box::pin(async move {
316            // Send to network if sender is present
317            if let Some(s) = sender {
318                let bytes = payload.get_bytes();
319                // We need to clone the bytes data from Arc<Vec<u8>> to Vec<u8> because send takes Vec<u8>
320                // Ideally send should take &[u8] but RawSender trait takes Vec<u8> currently
321                let data = bytes.as_ref().clone(); 
322                s.send(&topic, data).await.map_err(|e| SynapseError::Dendrite(crate::dendrite::DendriteError::Other(e)))?;
323            }
324            
325            Ok((vec![], vec![]))
326        })
327    }
328
329    fn react_erased(
330        &mut self,
331        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
332        raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
333        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
334    ) {
335        self.reactants.write().extend(reactants);
336        self.raw_reactants.write().extend(raw_reactants);
337        self.error_reactants.write().extend(error_reactants);
338    }
339}
340
341/// Wrapper that implements SynapseInternalErased for any concrete SynapseInternal
342pub struct SynapseInternalErasedWrapper<T, C, S> {
343    synapse: Arc<RwLock<S>>,
344    erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
345    _phantom: PhantomData<(T, C)>,
346}
347
348impl<T, C, S> SynapseInternalErasedWrapper<T, C, S>
349where
350    T: Send + Sync + 'static,
351    C: Codec<T> + CodecName + Send + Sync + 'static,
352    S: SynapseInternal<T, C> + Send + Sync + 'static,
353{
354    pub fn new(synapse: S) -> Self {
355        Self {
356            synapse: Arc::new(RwLock::new(synapse)),
357            erased_reactants: Vec::new(),
358            _phantom: PhantomData,
359        }
360    }
361
362    /// Create a type-erased synapse internal from a correctly typed synapse internal
363    pub fn from_typed_synapse(
364        synapse: S,
365    ) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>
366    where
367        T: 'static,
368        C: 'static,
369        S: 'static,
370    {
371        Arc::new(RwLock::new(Self::new(synapse)))
372    }
373
374    /// Safely attempt to downcast an erased synapse to its concrete type.
375    pub fn to_typed_synapse<U, D, R>(&self) -> ErasureResult<Arc<RwLock<R>>>
376    where
377        U: Send + Sync + 'static,
378        D: Codec<U> + CodecName + Send + Sync + 'static,
379        R: SynapseInternal<U, D> + Send + Sync + 'static,
380    {
381        if TypeId::of::<T>() == TypeId::of::<U>() && TypeId::of::<C>() == TypeId::of::<D>() {
382            // Safety: Types are checked by TypeId
383            unsafe {
384                Ok(std::mem::transmute::<Arc<RwLock<S>>, Arc<RwLock<R>>>(
385                    self.synapse.clone(),
386                ))
387            }
388        } else {
389            Err(ErasureError::SynapseTypeMismatch {
390                expected_payload_type: TypeId::of::<U>(),
391                expected_codec_type: TypeId::of::<D>(),
392                actual_payload_type: TypeId::of::<T>(),
393                actual_codec_type: TypeId::of::<C>(),
394            })
395        }
396    }
397}
398
399impl<T, C, S> SynapseInternalErased for SynapseInternalErasedWrapper<T, C, S>
400where
401    T: Send + Sync + 'static,
402    C: Codec<T> + CodecName + Send + Sync + 'static,
403    S: SynapseInternal<T, C> + Send + Sync + 'static,
404{
405    fn as_any(&self) -> &dyn Any {
406        self
407    }
408
409    fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
410        erase_neuron(self.synapse.read().neuron())
411    }
412
413    fn transduce_erased(
414        &self,
415        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
416    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
417        // Try to downcast the erased payload to its concrete wrapper safely
418        if let Some(wrapper) = payload
419            .as_any()
420            .downcast_ref::<PayloadErasedWrapper<T, C>>()
421        {
422            let typed_payload = wrapper.get_typed_payload();
423            let synapse_arc = self.synapse.clone();
424
425            // Extract the future before the await point to avoid holding the guard across awaits
426            let future = {
427                let synapse = synapse_arc.read();
428                synapse.transduce(typed_payload)
429            };
430
431            Box::pin(future)
432        } else {
433            // If the types don't match, return an error
434            let neuron_name = self.neuron_name();
435            Box::pin(async move { Err(SynapseError::NeuronTypeConversion { neuron_name }) })
436        }
437    }
438
439    fn transmit_erased(
440        &self,
441        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
442    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
443        // Try to downcast the erased payload to its concrete wrapper safely
444        if let Some(wrapper) = payload
445            .as_any()
446            .downcast_ref::<PayloadErasedWrapper<T, C>>()
447        {
448            let typed_payload = wrapper.get_typed_payload();
449            let synapse_arc = self.synapse.clone();
450
451            // Extract the future before the await point to avoid holding the guard across awaits
452            let future = {
453                let synapse = synapse_arc.read();
454                synapse.transmit(typed_payload)
455            };
456
457            Box::pin(future)
458        } else {
459            // If the types don't match, return an error
460            let neuron_name = self.neuron_name();
461            Box::pin(async move { Err(SynapseError::NeuronTypeConversion { neuron_name }) })
462        }
463    }
464
465    fn react_erased(
466        &mut self,
467        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
468        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
469    ) {
470        if reactants.is_empty() && error_reactants.is_empty() {
471            return; // No reactants to process
472        }
473
474        let typed_reactants: Vec<_> = reactants
475            .into_iter()
476            .filter_map(|erased_reactant| {
477                // First, check if the types match to avoid unnecessary work.
478                if erased_reactant.payload_type_id() != TypeId::of::<T>()
479                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
480                {
481                    return None;
482                }
483
484                // The types match, so clone into an `Any` Arc.
485                let any_arc = erased_reactant.clone_to_any();
486
487                // Attempt to downcast to the typed Arc we need.
488                any_arc
489                    .downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
490                    .ok()
491                    .map(|boxed_arc| (*boxed_arc).clone())
492            })
493            .collect();
494
495        let typed_error_reactants: Vec<_> = error_reactants
496            .into_iter()
497            .filter_map(|erased_reactant| {
498                if erased_reactant.payload_type_id() != TypeId::of::<T>()
499                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
500                {
501                    return None;
502                }
503                let any_arc = erased_reactant.clone_to_any();
504                any_arc
505                    .downcast::<Arc<dyn ErrorReactant<T, C> + Send + Sync + 'static>>()
506                    .ok()
507                    .map(|boxed_arc| (*boxed_arc).clone())
508            })
509            .collect();
510
511        // Now we can call react on the underlying synapse with mutable access through RwLock
512        if !typed_reactants.is_empty() || !typed_error_reactants.is_empty() {
513            let _ = self
514                .synapse
515                .write()
516                .react(typed_reactants, typed_error_reactants);
517        }
518    }
519
520    fn neuron_name(&self) -> String {
521        self.synapse.read().neuron().name()
522    }
523
524    fn codec_name(&self) -> String {
525        C::name().to_string()
526    }
527
528    fn neuron_schema(&self) -> String {
529        self.synapse.read().neuron().schema()
530    }
531
532    fn payload_type_id(&self) -> TypeId {
533        TypeId::of::<T>()
534    }
535
536    fn codec_type_id(&self) -> TypeId {
537        TypeId::of::<C>()
538    }
539
540    fn clone_to_box(&self) -> Box<dyn SynapseInternalErased + Send + Sync + 'static> {
541        Box::new(Self {
542            synapse: self.synapse.clone(),
543            erased_reactants: self.erased_reactants.clone(),
544            _phantom: PhantomData,
545        })
546    }
547
548    fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>> {
549        Arc::new(RwLock::new(Self {
550            synapse: self.synapse.clone(),
551            erased_reactants: self.erased_reactants.clone(),
552            _phantom: PhantomData,
553        }))
554    }
555}
556
557/// Wrapper that implements SynapseExternalErased for any concrete SynapseExternal
558pub struct SynapseExternalErasedWrapper<T, C, S> {
559    synapse: Arc<RwLock<S>>,
560    _phantom: PhantomData<(T, C)>,
561}
562
563impl<T, C, S> SynapseExternalErasedWrapper<T, C, S>
564where
565    T: Send + Sync + 'static,
566    C: Codec<T> + CodecName + Send + Sync + 'static,
567    S: SynapseExternal<T, C> + Send + Sync + 'static,
568{
569    pub fn new(synapse: S) -> Self {
570        Self {
571            synapse: Arc::new(RwLock::new(synapse)),
572            _phantom: PhantomData,
573        }
574    }
575
576    /// Create a type-erased external synapse from a correctly typed external synapse
577    pub fn from_typed_synapse(
578        synapse: S,
579    ) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>
580    where
581        T: 'static,
582        C: 'static,
583        S: 'static,
584    {
585        Arc::new(RwLock::new(Self::new(synapse)))
586    }
587
588    /// Safely attempt to downcast an erased external synapse to its concrete type.
589    pub fn to_typed_synapse<U, D, R>(&self) -> ErasureResult<Arc<RwLock<R>>>
590    where
591        U: Send + Sync + 'static,
592        D: Codec<U> + CodecName + Send + Sync + 'static,
593        R: SynapseExternal<U, D> + Send + Sync + 'static,
594    {
595        if TypeId::of::<T>() == TypeId::of::<U>() && TypeId::of::<C>() == TypeId::of::<D>() {
596            // Safety: Types are checked by TypeId
597            unsafe {
598                Ok(std::mem::transmute::<Arc<RwLock<S>>, Arc<RwLock<R>>>(
599                    self.synapse.clone(),
600                ))
601            }
602        } else {
603            Err(ErasureError::SynapseTypeMismatch {
604                expected_payload_type: TypeId::of::<U>(),
605                expected_codec_type: TypeId::of::<D>(),
606                actual_payload_type: TypeId::of::<T>(),
607                actual_codec_type: TypeId::of::<C>(),
608            })
609        }
610    }
611}
612
613impl<T, C, S> SynapseExternalErased for SynapseExternalErasedWrapper<T, C, S>
614where
615    T: Send + Sync + 'static,
616    C: Codec<T> + CodecName + Send + Sync + 'static,
617    S: SynapseExternal<T, C> + Send + Sync + 'static,
618{
619    fn as_any(&self) -> &dyn Any {
620        self
621    }
622
623    fn neuron(&self) -> Arc<dyn NeuronErased + Send + Sync + 'static> {
624        erase_neuron(self.synapse.read().neuron())
625    }
626
627    fn transduce_erased(
628        &self,
629        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
630    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>
631    {
632        // Try to downcast the erased payload to its concrete wrapper safely
633        if let Some(wrapper) = payload
634            .as_any()
635            .downcast_ref::<PayloadRawErasedWrapper<T, C>>()
636        {
637            let typed_payload = wrapper.get_payload_raw();
638            let synapse_arc = self.synapse.clone();
639
640            // Extract the future before the await point to avoid holding the guard across awaits
641            let future = {
642                let synapse = synapse_arc.read();
643                synapse.transduce(typed_payload)
644            };
645
646            Box::pin(future)
647        } else {
648            // It might be a SimplePayloadRawErased or another implementation.
649            // We create a typed PayloadRaw using the synapse's neuron.
650            let synapse_arc = self.synapse.clone();
651            let bytes = payload.get_bytes();
652            let trace = payload.get_trace_context();
653
654            // Acquire lock to get neuron and create the future
655            let future_res = {
656                let synapse = synapse_arc.read();
657                let typed_payload = Arc::new(PayloadRaw::from_parts(
658                    bytes,
659                    synapse.neuron().clone(),
660                    trace,
661                ));
662                synapse.transduce(typed_payload)
663            };
664
665            Box::pin(future_res)
666        }
667    }
668
669    fn transmit_erased(
670        &self,
671        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
672    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static>>
673    {
674        // Try to downcast the erased payload to its concrete wrapper safely
675        if let Some(wrapper) = payload
676            .as_any()
677            .downcast_ref::<PayloadRawErasedWrapper<T, C>>()
678        {
679            let typed_payload = wrapper.get_payload_raw();
680            let synapse_arc = self.synapse.clone();
681
682            // Extract the future before the await point to avoid holding the guard across awaits
683            let future = {
684                let synapse = synapse_arc.read();
685                synapse.transmit(typed_payload)
686            };
687
688            Box::pin(future)
689        } else {
690            // It might be a SimplePayloadRawErased or another implementation.
691            // We create a typed PayloadRaw using the synapse's neuron.
692            let synapse_arc = self.synapse.clone();
693            let bytes = payload.get_bytes();
694            let trace = payload.get_trace_context();
695
696            // Acquire lock to get neuron and create the future
697            let future_res = {
698                let synapse = synapse_arc.read();
699                let typed_payload = Arc::new(PayloadRaw::from_parts(
700                    bytes,
701                    synapse.neuron().clone(),
702                    trace,
703                ));
704                synapse.transmit(typed_payload)
705            };
706
707            Box::pin(future_res)
708        }
709    }
710
711    fn react_erased(
712        &mut self,
713        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
714        raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
715        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
716    ) {
717        if reactants.is_empty() && raw_reactants.is_empty() && error_reactants.is_empty() {
718            return; // No reactants to process
719        }
720
721        let typed_reactants: Vec<_> = reactants
722            .into_iter()
723            .filter_map(|erased_reactant| {
724                // First, check if the types match to avoid unnecessary work.
725                if erased_reactant.payload_type_id() != TypeId::of::<T>()
726                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
727                {
728                    return None;
729                }
730
731                // The types match, so clone into an `Any` Arc.
732                let any_arc = erased_reactant.clone_to_any();
733
734                // Attempt to downcast to the typed Arc we need.
735                any_arc
736                    .downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
737                    .ok()
738                    .map(|boxed_arc| (*boxed_arc).clone())
739            })
740            .collect();
741
742        let typed_raw_reactants: Vec<_> = raw_reactants
743            .into_iter()
744            .filter_map(|erased_reactant| {
745                // First, check if the types match to avoid unnecessary work.
746                if erased_reactant.payload_type_id() != TypeId::of::<T>()
747                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
748                {
749                    return None;
750                }
751
752                let any_arc = erased_reactant.clone_to_any();
753                any_arc
754                    .downcast::<Arc<dyn ReactantRaw<T, C> + Send + Sync + 'static>>()
755                    .ok()
756                    .map(|boxed_arc| (*boxed_arc).clone())
757            })
758            .collect();
759
760        let typed_error_reactants: Vec<_> = error_reactants
761            .into_iter()
762            .filter_map(|erased_reactant| {
763                if erased_reactant.payload_type_id() != TypeId::of::<T>()
764                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
765                {
766                    return None;
767                }
768                let any_arc = erased_reactant.clone_to_any();
769                any_arc
770                    .downcast::<Arc<dyn ErrorReactant<T, C> + Send + Sync + 'static>>()
771                    .ok()
772                    .map(|boxed_arc| (*boxed_arc).clone())
773            })
774            .collect();
775
776        // Now we can call react on the underlying synapse with mutable access through RwLock
777        if !typed_reactants.is_empty()
778            || !typed_raw_reactants.is_empty()
779            || !typed_error_reactants.is_empty()
780        {
781            let _ = self
782                .synapse
783                .write()
784                .react(typed_reactants, typed_raw_reactants, typed_error_reactants);
785        }
786    }
787
788    fn neuron_name(&self) -> String {
789        self.synapse.read().neuron().name()
790    }
791
792    fn codec_name(&self) -> String {
793        C::name().to_string()
794    }
795
796    fn neuron_schema(&self) -> String {
797        self.synapse.read().neuron().schema()
798    }
799
800    fn payload_type_id(&self) -> TypeId {
801        TypeId::of::<T>()
802    }
803
804    fn codec_type_id(&self) -> TypeId {
805        TypeId::of::<C>()
806    }
807
808    fn clone_to_box(&self) -> Box<dyn SynapseExternalErased + Send + Sync + 'static> {
809        Box::new(Self {
810            synapse: self.synapse.clone(),
811            _phantom: PhantomData,
812        })
813    }
814
815    fn clone_to_arc(&self) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>> {
816        Arc::new(RwLock::new(Self {
817            synapse: self.synapse.clone(),
818            _phantom: PhantomData,
819        }))
820    }
821}
822
823/// Convenience function to erase a concrete internal synapse
824pub fn erase_synapse_internal<T, C, S>(
825    synapse: S,
826) -> Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>
827where
828    T: Send + Sync + 'static,
829    C: Codec<T> + CodecName + Send + Sync + 'static,
830    S: SynapseInternal<T, C> + Send + Sync + 'static,
831{
832    SynapseInternalErasedWrapper::from_typed_synapse(synapse)
833}
834
835/// Convenience function to erase a concrete external synapse
836pub fn erase_synapse_external<T, C, S>(
837    synapse: S,
838) -> Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>
839where
840    T: Send + Sync + 'static,
841    C: Codec<T> + CodecName + Send + Sync + 'static,
842    S: SynapseExternal<T, C> + Send + Sync + 'static,
843{
844    SynapseExternalErasedWrapper::from_typed_synapse(synapse)
845}