Skip to main content

plexor_core/
plexus.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::axon::AxonError;
8use crate::codec::{Codec, CodecName};
9use crate::erasure::neuron::{NeuronErased, erase_neuron};
10use crate::erasure::payload::{PayloadErased, erase_payload, erase_payload_raw};
11use crate::erasure::reactant::{
12    ErrorReactantErased, ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
13};
14use crate::ganglion::{
15    Ganglion, GanglionError, GanglionExternal, GanglionInprocess, GanglionInternal,
16};
17use crate::logging::LogTrace;
18use crate::neuron::Neuron;
19use crate::payload::{Payload, PayloadRaw};
20use crate::reactant::{Reactant, ReactantRaw};
21use futures_util::future::join_all;
22use itertools::Itertools;
23use moka::future::Cache;
24use std::collections::{HashMap, HashSet};
25use std::future::Future;
26use std::marker::PhantomData;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::time::Duration;
30use thiserror::Error;
31use tokio::sync::{Mutex, RwLock};
32use tracing::{Instrument};
33use uuid::Uuid;
34
35#[derive(Error, Debug)]
36pub enum PlexusError {
37    #[error("Ganglion error: {0}")]
38    Ganglion(#[from] GanglionError),
39    #[error("Failed to acquire lock on external ganglia")]
40    ExternalGangliaLock,
41    #[error("Failed to acquire lock on internal ganglia")]
42    InternalGangliaLock,
43    #[error("Failed to acquire lock on neurons")]
44    NeuronsLock,
45    #[error("Failed to acquire lock on reactant factories")]
46    ReactantFactoriesLock,
47    #[error("Failed to acquire lock on neuron ganglia")]
48    NeuronGangliaLock,
49    #[error("Failed to acquire lock on reactions")]
50    ReactionsLock,
51    #[error("Neuron adaptation failed for {neuron_name}")]
52    NeuronAdaptation { neuron_name: String },
53    #[error("Reactant creation failed for {neuron_name}")]
54    ReactantCreation { neuron_name: String },
55    #[error("Transmission failed")]
56    Transmission,
57}
58
59impl From<AxonError> for PlexusError {
60    fn from(error: AxonError) -> Self {
61        match error {
62            AxonError::GanglionError(e) => PlexusError::Ganglion(e),
63            AxonError::NeuronNotAdapted {
64                neuron_name,
65                ganglion_name,
66                ganglion_id,
67            } => PlexusError::Ganglion(GanglionError::SynapseNotFound {
68                neuron_name,
69                ganglion_name,
70                ganglion_id,
71            }),
72            AxonError::SynapseLock {
73                neuron_name,
74                ganglion_name,
75                ganglion_id,
76            } => PlexusError::Ganglion(GanglionError::SynapseLock {
77                neuron_name,
78                ganglion_name,
79                ganglion_id,
80            }),
81            AxonError::Transmit {
82                neuron_name,
83                ganglion_name,
84                ganglion_id,
85                message,
86            } => PlexusError::Ganglion(GanglionError::Transmit {
87                neuron_name,
88                ganglion_name,
89                ganglion_id,
90                message,
91            }),
92            AxonError::Encode {
93                neuron_name,
94                ganglion_name,
95                ganglion_id,
96            } => PlexusError::Ganglion(GanglionError::Encode {
97                neuron_name,
98                ganglion_name,
99                ganglion_id,
100            }),
101            AxonError::Decode {
102                neuron_name,
103                ganglion_name,
104                ganglion_id,
105            } => PlexusError::Ganglion(GanglionError::Decode {
106                neuron_name,
107                ganglion_name,
108                ganglion_id,
109            }),
110            AxonError::TransmissionTimeout => PlexusError::Transmission,
111        }
112    }
113}
114
115pub struct PlexusReactantFactories {
116    pub internal_factory: Arc<dyn ErasedInternalReactantFactory + Send + Sync>,
117    pub external_internal_factory: Arc<dyn ErasedExternalInternalReactantFactory + Send + Sync>,
118    pub external_external_factory: Arc<dyn ErasedExternalExternalReactantFactory + Send + Sync>,
119}
120
121/// A Plexus manages a network of Ganglia, arranging automatic transmission of types between different Ganglia.
122#[allow(clippy::type_complexity)]
123pub struct Plexus {
124    /// Unique identifier for this plexus instance
125    id: Uuid,
126
127    /// In-process ganglion for handling local neurons
128    inproc_ganglion: Arc<Mutex<GanglionInprocess>>,
129
130    /// External ganglia that handle encoded payloads, mapped by their unique_id
131    external_ganglia:
132        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
133
134    /// Internal ganglia that handle decoded payloads, mapped by their unique_id
135    internal_ganglia:
136        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
137
138    /// Set of neurons managed by this plexus, mapped by their name
139    neurons: Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
140
141    /// Mapping of neuron strongly-typed reactant factories
142    reactant_factories: Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
143
144    /// Mapping of neuron ganglion pairs
145    neuron_ganglia: Arc<RwLock<HashSet<(String, Uuid)>>>,
146
147    /// Reactions tracking for preventing loops - maps span_id to a set of ganglia IDs it has processed.
148    /// Uses a Cache with TTL to automatically expire old reactions.
149    reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
150
151    /// Neurons that this plexus will handle (if empty, handles all)
152    relevant_neurons: HashSet<String>,
153
154    /// Neurons that this plexus will ignore
155    ignored_neurons: HashSet<String>,
156}
157
158impl Plexus {
159    /// Returns a read guard to the neurons map
160    pub async fn neurons(
161        &self,
162    ) -> tokio::sync::RwLockReadGuard<
163        '_,
164        HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>,
165    > {
166        self.neurons.read().await
167    }
168
169    /// Create a new Plexus with the given ganglia and neuron filters
170    pub async fn new(
171        relevant_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
172        ignored_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
173    ) -> Self {
174        let inproc_ganglion = Arc::new(Mutex::new(GanglionInprocess::new()));
175
176        let relevant_neuron_names = relevant_neurons
177            .iter()
178            .map(|n| n.name())
179            .collect::<HashSet<String>>();
180
181        let ignored_neuron_names = ignored_neurons
182            .iter()
183            .map(|n| n.name())
184            .collect::<HashSet<String>>();
185
186        let reactions = Cache::builder()
187            .time_to_idle(Duration::from_secs(60)) // Default TTL 60s
188            .build();
189
190        let plexus = Self {
191            id: Uuid::now_v7(),
192            inproc_ganglion: inproc_ganglion.clone(),
193            external_ganglia: Arc::new(RwLock::new(HashMap::new())),
194            internal_ganglia: Arc::new(RwLock::new(HashMap::new())),
195            neurons: Arc::new(RwLock::new(HashMap::new())),
196            reactant_factories: Arc::new(RwLock::new(HashMap::new())),
197            neuron_ganglia: Arc::new(RwLock::new(HashSet::new())),
198            reactions,
199            relevant_neurons: relevant_neuron_names,
200            ignored_neurons: ignored_neuron_names,
201        };
202
203        let ganglion_id = {
204            let ganglion_guard = inproc_ganglion.lock().await;
205            ganglion_guard.unique_id()
206        };
207        plexus
208            .internal_ganglia
209            .write()
210            .await
211            .insert(ganglion_id, inproc_ganglion);
212
213        plexus
214    }
215
216    /// Helper to create an Arc<Mutex<Plexus>>
217    pub async fn new_shared(
218        relevant_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
219        ignored_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
220    ) -> Arc<Mutex<Self>> {
221        Arc::new(Mutex::new(
222            Self::new(relevant_neurons, ignored_neurons).await,
223        ))
224    }
225
226    /// Add a ganglion to the plexus
227    pub async fn infuse_ganglion<G>(&mut self, ganglion: Arc<Mutex<G>>) -> Result<(), PlexusError>
228    where
229        G: GanglionInternal + Ganglion + Send + Sync + 'static,
230    {
231        let ganglion_id = {
232            let ganglion_guard = ganglion.lock().await;
233            ganglion_guard.unique_id()
234        };
235
236        self.internal_ganglia
237            .write()
238            .await
239            .insert(ganglion_id, ganglion.clone());
240
241        self.update_neuron_ganglia().await?;
242        Ok(())
243    }
244
245    /// Add an external ganglion to the plexus
246    pub async fn infuse_external_ganglion<G>(
247        &mut self,
248        ganglion: Arc<Mutex<G>>,
249    ) -> Result<(), PlexusError>
250    where
251        G: GanglionExternal + Send + Sync + 'static,
252    {
253        let ganglion_id = {
254            let ganglion_guard = ganglion.lock().await;
255            ganglion_guard.unique_id()
256        };
257
258        self.external_ganglia
259            .write()
260            .await
261            .insert(ganglion_id, ganglion);
262
263        self.update_neuron_ganglia().await?;
264        Ok(())
265    }
266
267    /// Remove a ganglion from the plexus by its ID
268    pub async fn excise_ganglion_by_id(
269        &mut self,
270        ganglion_id: Uuid,
271    ) -> Result<Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>, PlexusError> {
272        let removed = self.internal_ganglia.write().await.remove(&ganglion_id);
273
274        if removed.is_some() {
275            self.update_neuron_ganglia().await?;
276        }
277
278        Ok(removed)
279    }
280
281    /// Remove a ganglion from the plexus
282    pub async fn excise_ganglion<G>(
283        &mut self,
284        ganglion: Arc<Mutex<G>>,
285    ) -> Result<Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>, PlexusError>
286    where
287        G: GanglionInternal + Send + Sync + ?Sized + 'static,
288    {
289        let ganglion_id = {
290            let guard = ganglion.lock().await;
291            guard.unique_id()
292        };
293        self.excise_ganglion_by_id(ganglion_id).await
294    }
295
296    /// Remove an external ganglion from the plexus by its ID
297    pub async fn excise_external_ganglion_by_id(
298        &mut self,
299        ganglion_id: Uuid,
300    ) -> Result<Option<Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>, PlexusError> {
301        let removed = self.external_ganglia.write().await.remove(&ganglion_id);
302
303        if removed.is_some() {
304            self.update_neuron_ganglia().await?;
305        }
306
307        Ok(removed)
308    }
309
310    /// Remove an external ganglion from the plexus
311    pub async fn excise_external_ganglion<G>(
312        &mut self,
313        ganglion: Arc<Mutex<G>>,
314    ) -> Result<Option<Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>, PlexusError>
315    where
316        G: GanglionExternal + Send + Sync + ?Sized + 'static,
317    {
318        let ganglion_id = {
319            let guard = ganglion.lock().await;
320            guard.unique_id()
321        };
322        self.excise_external_ganglion_by_id(ganglion_id).await
323    }
324
325    /// Update neurons with a new neuron
326    pub async fn update_neurons(
327        &self,
328        neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
329    ) -> Result<(), PlexusError> {
330        let mut neurons = self.neurons.write().await;
331        neurons.insert(neuron.name(), neuron);
332        drop(neurons);
333
334        self.update_neuron_ganglia().await?;
335        Ok(())
336    }
337
338    /// Update neuron-ganglion mappings for all neurons and ganglia
339    async fn update_neuron_ganglia(&self) -> Result<(), PlexusError> {
340        update_neuron_ganglia_internal(
341            &self.neurons,
342            &self.internal_ganglia,
343            &self.external_ganglia,
344            &self.neuron_ganglia,
345            &self.reactant_factories,
346            &self.reactions,
347        )
348        .await
349    }
350}
351
352impl Ganglion for Plexus {
353    fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
354    where
355        C: Codec<T> + CodecName + Send + Sync + 'static,
356        T: Send + Sync + 'static,
357    {
358        let neuron_name = neuron.name();
359
360        if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
361            return false;
362        }
363
364        if self.ignored_neurons.contains(&neuron_name) {
365            return false;
366        }
367
368        true
369    }
370
371    fn adapt<T, C>(
372        &mut self,
373        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
374    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
375    where
376        C: Codec<T> + CodecName + Send + Sync + 'static,
377        T: Send + Sync + 'static,
378    {
379        tracing::debug!(neuron = %neuron.name(), "Plexus::adapt - Adapting neuron");
380
381        if !self.capable(neuron.clone()) {
382            return Box::pin(async move { Ok(()) });
383        }
384
385        let inproc_ganglion = self.inproc_ganglion.clone();
386        let erased_neuron = erase_neuron(neuron.clone());
387        let reactions = self.reactions.clone();
388        let reactant_factories = self.reactant_factories.clone();
389        let neurons = self.neurons.clone();
390        let id = self.id;
391        let internal_ganglia = self.internal_ganglia.clone();
392        let external_ganglia = self.external_ganglia.clone();
393        let neuron_ganglia = self.neuron_ganglia.clone();
394
395        Box::pin(async move {
396            let future = {
397                let mut inproc_ganglion_guard = inproc_ganglion.lock().await;
398                inproc_ganglion_guard.adapt(neuron.clone())
399            };
400
401            let result = future.await;
402
403            let factories = PlexusReactantFactories {
404                internal_factory: Arc::new(PlexusInternalReactantFactory::<T, C>::new()),
405                external_internal_factory: Arc::new(
406                    PlexusExternalInternalReactantFactory::<T, C>::new(),
407                ),
408                external_external_factory: Arc::new(
409                    PlexusExternalExternalReactantFactory::<T, C>::new(),
410                ),
411            };
412
413            reactant_factories
414                .write()
415                .await
416                .insert(neuron.name(), factories);
417
418            if let Err(plexus_error) = update_neurons_internal(
419                &neurons,
420                &internal_ganglia,
421                &external_ganglia,
422                &neuron_ganglia,
423                &reactant_factories,
424                &reactions,
425                erased_neuron,
426            )
427            .await
428            {
429                return Err(GanglionError::from_plexus_error(
430                    plexus_error,
431                    neuron.name(),
432                    "Plexus".to_string(),
433                    id,
434                ));
435            }
436
437            result
438        })
439    }
440}
441
442#[allow(clippy::type_complexity)]
443async fn update_neurons_internal(
444    neurons_map: &Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
445    internal_ganglia: &Arc<
446        RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
447    >,
448    external_ganglia: &Arc<
449        RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
450    >,
451    neuron_ganglia: &Arc<RwLock<HashSet<(String, Uuid)>>>,
452    reactant_factories: &Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
453    reactions: &Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
454    neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
455) -> Result<(), PlexusError> {
456    let mut neurons = neurons_map.write().await;
457    neurons.insert(neuron.name(), neuron);
458    drop(neurons);
459
460    update_neuron_ganglia_internal(
461        neurons_map,
462        internal_ganglia,
463        external_ganglia,
464        neuron_ganglia,
465        reactant_factories,
466        reactions,
467    )
468    .await?;
469
470    Ok(())
471}
472
473#[allow(clippy::type_complexity)]
474async fn update_neuron_ganglia_internal(
475    neurons_map: &Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
476    internal_ganglia_map: &Arc<
477        RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
478    >,
479    external_ganglia_map: &Arc<
480        RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
481    >,
482    neuron_ganglia_set: &Arc<RwLock<HashSet<(String, Uuid)>>>,
483    reactant_factories_map: &Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
484    reactions: &Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
485) -> Result<(), PlexusError> {
486    let neurons = neurons_map.read().await;
487    let internal_ganglia = internal_ganglia_map.read().await;
488    let external_ganglia = external_ganglia_map.read().await;
489
490    let all_internal_neuron_ganglia: HashSet<(String, Uuid)> = neurons
491        .iter()
492        .cartesian_product(internal_ganglia.iter())
493        .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
494            (neuron_name.clone(), *ganglion_id)
495        })
496        .collect();
497
498    let all_external_neuron_ganglia: HashSet<(String, Uuid)> = neurons
499        .iter()
500        .cartesian_product(external_ganglia.iter())
501        .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
502            (neuron_name.clone(), *ganglion_id)
503        })
504        .collect();
505
506    let mut neuron_ganglia = neuron_ganglia_set.write().await;
507    let new_internal_combinations: Vec<(String, Uuid)> = all_internal_neuron_ganglia
508        .difference(&neuron_ganglia)
509        .cloned()
510        .collect();
511
512    let new_external_combinations: Vec<(String, Uuid)> = all_external_neuron_ganglia
513        .difference(&neuron_ganglia)
514        .cloned()
515        .collect();
516
517    *neuron_ganglia = all_internal_neuron_ganglia
518        .union(&all_external_neuron_ganglia)
519        .cloned()
520        .collect();
521
522    drop(neuron_ganglia);
523
524    let mut internal_reactants_by_ganglion: HashMap<
525        (String, Uuid),
526        Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
527    > = HashMap::new();
528
529    for (neuron_name, ganglion_id) in new_internal_combinations {
530        let internal_reactant = reactant_factories_map
531            .read()
532            .await
533            .get(&neuron_name)
534            .ok_or_else(|| PlexusError::ReactantCreation {
535                neuron_name: neuron_name.clone(),
536            })?
537            .internal_factory
538            .create_reactant(
539                ganglion_id,
540                internal_ganglia_map.clone(),
541                external_ganglia_map.clone(),
542                reactions.clone(),
543            );
544
545        internal_reactants_by_ganglion
546            .entry((neuron_name, ganglion_id))
547            .or_default()
548            .push(internal_reactant);
549    }
550
551    for ((neuron_name, ganglion_id), reactants) in internal_reactants_by_ganglion {
552        if let Some(ganglion_mutex) = internal_ganglia.get(&ganglion_id) {
553            let mut ganglion = ganglion_mutex.lock().await;
554            ganglion.react(neuron_name, reactants, vec![]).await?;
555        }
556    }
557
558    #[allow(clippy::type_complexity)]
559    let mut external_reactants_by_ganglion: HashMap<
560        (String, Uuid),
561        (
562            Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
563            Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
564        ),
565    > = HashMap::new();
566
567    for (neuron_name, ganglion_id) in new_external_combinations {
568        let external_internal_reactant = reactant_factories_map
569            .read()
570            .await
571            .get(&neuron_name)
572            .ok_or_else(|| PlexusError::ReactantCreation {
573                neuron_name: neuron_name.clone(),
574            })?
575            .external_internal_factory
576            .create_reactant(ganglion_id, internal_ganglia_map.clone(), reactions.clone());
577
578        let external_external_reactant = reactant_factories_map
579            .read()
580            .await
581            .get(&neuron_name)
582            .ok_or_else(|| PlexusError::ReactantCreation {
583                neuron_name: neuron_name.clone(),
584            })?
585            .external_external_factory
586            .create_reactant(ganglion_id, external_ganglia_map.clone(), reactions.clone());
587
588        let entry = external_reactants_by_ganglion
589            .entry((neuron_name.clone(), ganglion_id))
590            .or_default();
591        entry.0.push(external_internal_reactant);
592        entry.1.push(external_external_reactant);
593    }
594
595    for ((neuron_name, ganglion_id), (reactants, reactants_raw)) in external_reactants_by_ganglion {
596        if let Some(ganglion_mutex) = external_ganglia.get(&ganglion_id) {
597            let mut ganglion = ganglion_mutex.lock().await;
598            ganglion
599                .react(neuron_name, reactants, reactants_raw, vec![])
600                .await?;
601        }
602    }
603
604    Ok(())
605}
606
607impl GanglionInternal for Plexus {
608    fn transmit(
609        &mut self,
610        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
611    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
612        let inproc_ganglion = self.inproc_ganglion.clone();
613
614        Box::pin(async move {
615            let future = {
616                let mut ganglion = inproc_ganglion.lock().await;
617                ganglion.transmit(payload)
618            };
619
620            future.await
621        })
622    }
623
624    fn react(
625        &mut self,
626        neuron_name: String,
627        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
628        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
629    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
630        let inproc_ganglion = self.inproc_ganglion.clone();
631
632        Box::pin(async move {
633            let future = {
634                let mut ganglion = inproc_ganglion.lock().await;
635                ganglion.react(neuron_name, reactants, error_reactants)
636            };
637
638            future.await
639        })
640    }
641
642    fn react_many(
643        &mut self,
644        reactions: HashMap<
645            String,
646            (
647                Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
648                Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
649            ),
650        >,
651    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
652        let inproc_ganglion = self.inproc_ganglion.clone();
653
654        Box::pin(async move {
655            let mut ganglion = inproc_ganglion.lock().await;
656            ganglion.react_many(reactions).await
657        })
658    }
659
660    fn unique_id(&self) -> Uuid {
661        self.id
662    }
663}
664
665// --- Reactants ---
666
667pub trait ErasedInternalReactantFactory: Send + Sync + 'static {
668    #[allow(clippy::type_complexity)]
669    fn create_reactant(
670        &self,
671        current_ganglion_id: Uuid,
672        internal_ganglia: Arc<
673            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
674        >,
675        external_ganglia: Arc<
676            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
677        >,
678        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
679    ) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
680}
681
682pub trait ErasedExternalInternalReactantFactory: Send + Sync + 'static {
683    #[allow(clippy::type_complexity)]
684    fn create_reactant(
685        &self,
686        current_ganglion_id: Uuid,
687        internal_ganglia: Arc<
688            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
689        >,
690        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
691    ) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
692}
693
694pub trait ErasedExternalExternalReactantFactory: Send + Sync + 'static {
695    #[allow(clippy::type_complexity)]
696    fn create_reactant(
697        &self,
698        current_ganglion_id: Uuid,
699        external_ganglia: Arc<
700            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
701        >,
702        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
703    ) -> Arc<dyn ReactantRawErased + Send + Sync + 'static>;
704}
705
706pub struct PlexusInternalReactantFactory<T, C> {
707    _phantom: PhantomData<(T, C)>,
708}
709
710impl<T, C> PlexusInternalReactantFactory<T, C> {
711    pub fn new() -> Self {
712        Self {
713            _phantom: PhantomData,
714        }
715    }
716}
717
718impl<T, C> Default for PlexusInternalReactantFactory<T, C> {
719    fn default() -> Self {
720        Self::new()
721    }
722}
723
724impl<T, C> ErasedInternalReactantFactory for PlexusInternalReactantFactory<T, C>
725where
726    C: Codec<T> + CodecName + Send + Sync + 'static,
727    T: Send + Sync + 'static,
728{
729    fn create_reactant(
730        &self,
731        current_ganglion_id: Uuid,
732        internal_ganglia: Arc<
733            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
734        >,
735        external_ganglia: Arc<
736            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
737        >,
738        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
739    ) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
740        erase_reactant(Box::new(PlexusInternalReactant::<T, C>::new(
741            current_ganglion_id,
742            internal_ganglia,
743            external_ganglia,
744            reactions,
745        )))
746    }
747}
748
749pub struct PlexusExternalInternalReactantFactory<T, C> {
750    _phantom: PhantomData<(T, C)>,
751}
752
753impl<T, C> PlexusExternalInternalReactantFactory<T, C> {
754    pub fn new() -> Self {
755        Self {
756            _phantom: PhantomData,
757        }
758    }
759}
760
761impl<T, C> Default for PlexusExternalInternalReactantFactory<T, C> {
762    fn default() -> Self {
763        Self::new()
764    }
765}
766
767impl<T, C> ErasedExternalInternalReactantFactory for PlexusExternalInternalReactantFactory<T, C>
768where
769    C: Codec<T> + CodecName + Send + Sync + 'static,
770    T: Send + Sync + 'static,
771{
772    fn create_reactant(
773        &self,
774        current_ganglion_id: Uuid,
775        internal_ganglia: Arc<
776            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
777        >,
778        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
779    ) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
780        erase_reactant(Box::new(PlexusExternalInternalReactant::<T, C>::new(
781            current_ganglion_id,
782            internal_ganglia,
783            reactions,
784        )))
785    }
786}
787
788pub struct PlexusExternalExternalReactantFactory<T, C> {
789    _phantom: PhantomData<(T, C)>,
790}
791
792impl<T, C> PlexusExternalExternalReactantFactory<T, C> {
793    pub fn new() -> Self {
794        Self {
795            _phantom: PhantomData,
796        }
797    }
798}
799
800impl<T, C> Default for PlexusExternalExternalReactantFactory<T, C> {
801    fn default() -> Self {
802        Self::new()
803    }
804}
805
806impl<T, C> ErasedExternalExternalReactantFactory for PlexusExternalExternalReactantFactory<T, C>
807where
808    C: Codec<T> + CodecName + Send + Sync + 'static,
809    T: Send + Sync + 'static,
810{
811    fn create_reactant(
812        &self,
813        current_ganglion_id: Uuid,
814        external_ganglia: Arc<
815            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
816        >,
817        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
818    ) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
819        erase_reactant_raw(Box::new(PlexusExternalExternalReactant::<T, C>::new(
820            current_ganglion_id,
821            external_ganglia,
822            reactions,
823        )))
824    }
825}
826
827#[allow(clippy::type_complexity)]
828pub struct PlexusInternalReactant<T, C>
829where
830    C: Codec<T> + CodecName + Send + Sync + 'static,
831    T: Send + Sync + 'static,
832{
833    current_ganglion_id: Uuid,
834    internal_ganglia:
835        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
836    external_ganglia:
837        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
838    reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
839    _phantom: PhantomData<(T, C)>,
840}
841
842impl<T, C> Clone for PlexusInternalReactant<T, C>
843where
844    C: Codec<T> + CodecName + Send + Sync + 'static,
845    T: Send + Sync + 'static,
846{
847    fn clone(&self) -> Self {
848        Self {
849            current_ganglion_id: self.current_ganglion_id,
850            internal_ganglia: self.internal_ganglia.clone(),
851            external_ganglia: self.external_ganglia.clone(),
852            reactions: self.reactions.clone(),
853            _phantom: self._phantom,
854        }
855    }
856}
857
858impl<T, C> PlexusInternalReactant<T, C>
859where
860    C: Codec<T> + CodecName + Send + Sync + 'static,
861    T: Send + Sync + 'static,
862{
863    #[allow(clippy::type_complexity)]
864    pub fn new(
865        current_ganglion_id: Uuid,
866        internal_ganglia: Arc<
867            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
868        >,
869        external_ganglia: Arc<
870            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
871        >,
872        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
873    ) -> Self {
874        Self {
875            current_ganglion_id,
876            internal_ganglia,
877            external_ganglia,
878            reactions,
879            _phantom: PhantomData,
880        }
881    }
882}
883
884impl<T, C> Reactant<T, C> for PlexusInternalReactant<T, C>
885where
886    C: Codec<T> + CodecName + Send + Sync + 'static,
887    T: Send + Sync + 'static,
888{
889    fn react(
890        &self,
891        payload: Arc<Payload<T, C>>,
892    ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
893        let current_ganglion_id = self.current_ganglion_id;
894        let internal_ganglia = self.internal_ganglia.clone();
895        let external_ganglia = self.external_ganglia.clone();
896        let reactions = self.reactions.clone();
897
898        let payload_clone = payload.clone();
899        Box::pin(
900            async move {
901                tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
902
903                let reaction_set_arc = reactions
904                    .get_with(payload_clone.span_id(), async {
905                        Arc::new(Mutex::new(HashSet::new()))
906                    })
907                    .await;
908
909                let reaction_set_copy = {
910                    let mut set = reaction_set_arc.lock().await;
911                    if set.contains(&current_ganglion_id) {
912                        tracing::debug!(
913                        "Ganglion {current_ganglion_id} already processed reaction, returning early"
914                    );
915                        return Ok(());
916                    }
917                    set.insert(current_ganglion_id);
918                    set.clone()
919                };
920
921                let erased_payload = erase_payload(payload_clone.clone());
922
923                type UnifiedTransmitFuture = Pin<
924                    Box<
925                        dyn Future<Output = Result<Vec<()>, crate::ganglion::GanglionError>> + Send,
926                    >,
927                >;
928
929                let internal_ganglia_guard = internal_ganglia.read().await;
930                let internal_ganglia_to_process: Vec<_> = internal_ganglia_guard
931                    .iter()
932                    .filter(|(ganglion_id, _ganglion)| {
933                        *ganglion_id != &current_ganglion_id
934                            && !reaction_set_copy.contains(ganglion_id)
935                    })
936                    .map(|(_ganglion_id, ganglion)| ganglion.clone())
937                    .collect();
938                drop(internal_ganglia_guard);
939
940                let external_ganglia_guard = external_ganglia.read().await;
941                let external_ganglia_to_process: Vec<_> = external_ganglia_guard
942                    .iter()
943                    .filter(|(ganglion_id, _ganglion)| !reaction_set_copy.contains(ganglion_id))
944                    .map(|(_ganglion_id, ganglion)| ganglion.clone())
945                    .collect();
946                drop(external_ganglia_guard);
947
948                let internal_futures =
949                    internal_ganglia_to_process
950                        .into_iter()
951                        .map(|ganglion_mutex| {
952                            let payload = erased_payload.clone();
953                            Box::pin(async move {
954                    let mut ganglion = ganglion_mutex.lock().await;
955                    ganglion.transmit(payload.clone()).await
956                }) as UnifiedTransmitFuture
957                        });
958
959                let external_futures =
960                    external_ganglia_to_process
961                        .into_iter()
962                        .map(|ganglion_mutex| {
963                            let payload = erased_payload.clone();
964                            Box::pin(async move {
965                    let mut ganglion = ganglion_mutex.lock().await;
966                    ganglion.transmit(payload.clone()).await
967                }) as UnifiedTransmitFuture
968                        });
969
970                let all_futures: Vec<_> = internal_futures.chain(external_futures).collect();
971                let _results = join_all(all_futures).await;
972                Ok(())
973            }
974            .instrument(payload.span_debug("PlexusInternalReactant::react")),
975        )
976    }
977
978    fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
979        erase_reactant(self)
980    }
981}
982
983/// Safe wrapper for plexus internal reaction from external ganglia
984#[allow(clippy::type_complexity)]
985pub struct PlexusExternalInternalReactant<T, C>
986where
987    C: Codec<T> + CodecName + Send + Sync + 'static,
988    T: Send + Sync + 'static,
989{
990    current_ganglion_id: Uuid,
991    internal_ganglia:
992        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
993    reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
994    _phantom: PhantomData<(T, C)>,
995}
996
997impl<T, C> Clone for PlexusExternalInternalReactant<T, C>
998where
999    C: Codec<T> + CodecName + Send + Sync + 'static,
1000    T: Send + Sync + 'static,
1001{
1002    fn clone(&self) -> Self {
1003        Self {
1004            current_ganglion_id: self.current_ganglion_id,
1005            internal_ganglia: self.internal_ganglia.clone(),
1006            reactions: self.reactions.clone(),
1007            _phantom: self._phantom,
1008        }
1009    }
1010}
1011
1012impl<T, C> PlexusExternalInternalReactant<T, C>
1013where
1014    C: Codec<T> + CodecName + Send + Sync + 'static,
1015    T: Send + Sync + 'static,
1016{
1017    #[allow(clippy::type_complexity)]
1018    pub fn new(
1019        current_ganglion_id: Uuid,
1020        internal_ganglia: Arc<
1021            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
1022        >,
1023        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
1024    ) -> Self {
1025        Self {
1026            current_ganglion_id,
1027            internal_ganglia,
1028            reactions,
1029            _phantom: PhantomData,
1030        }
1031    }
1032}
1033
1034impl<T, C> Reactant<T, C> for PlexusExternalInternalReactant<T, C>
1035where
1036    C: Codec<T> + CodecName + Send + Sync + 'static,
1037    T: Send + Sync + 'static,
1038{
1039    fn react(
1040        &self,
1041        payload: Arc<Payload<T, C>>,
1042    ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
1043        let current_ganglion_id = self.current_ganglion_id;
1044        let internal_ganglia = self.internal_ganglia.clone();
1045        let reactions = self.reactions.clone();
1046
1047        let payload_clone = payload.clone();
1048        Box::pin(
1049            async move {
1050                tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
1051
1052                let reaction_set_arc = reactions
1053                    .get_with(payload_clone.span_id(), async {
1054                        Arc::new(Mutex::new(HashSet::new()))
1055                    })
1056                    .await;
1057
1058                let reaction_set_copy = {
1059                    let mut set = reaction_set_arc.lock().await;
1060                    if set.contains(&current_ganglion_id) {
1061                        tracing::debug!(
1062                        "Ganglion {current_ganglion_id} already processed reaction, returning early"
1063                    );
1064                        return Ok(());
1065                    }
1066                    set.insert(current_ganglion_id);
1067                    set.clone()
1068                };
1069
1070                let erased_payload = erase_payload(payload_clone.clone());
1071
1072                let internal_ganglia_guard = internal_ganglia.read().await;
1073                let internal_ganglia_to_process: Vec<_> = internal_ganglia_guard
1074                    .iter()
1075                    .filter(|(ganglion_id, _ganglion)| {
1076                        !reaction_set_copy.contains(ganglion_id)
1077                    })
1078                    .map(|(_ganglion_id, ganglion)| ganglion.clone())
1079                    .collect();
1080                drop(internal_ganglia_guard);
1081
1082                let internal_futures = internal_ganglia_to_process.into_iter().map(
1083                    |ganglion_mutex| {
1084                        let payload = erased_payload.clone();
1085                        Box::pin(async move {
1086                    let mut ganglion = ganglion_mutex.lock().await;
1087                    ganglion.transmit(payload.clone()).await
1088                })
1089                    },
1090                );
1091
1092                let _results = join_all(internal_futures).await;
1093                Ok(())
1094            }
1095            .instrument(payload.span_debug("PlexusExternalInternalReactant::react")),
1096        )
1097    }
1098
1099    fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
1100        erase_reactant(self)
1101    }
1102}
1103
1104/// Safe wrapper for plexus external reaction from external ganglia
1105#[allow(clippy::type_complexity)]
1106pub struct PlexusExternalExternalReactant<T, C>
1107where
1108    C: Codec<T> + CodecName + Send + Sync + 'static,
1109    T: Send + Sync + 'static,
1110{
1111    current_ganglion_id: Uuid,
1112    external_ganglia:
1113        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
1114    reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
1115    _phantom: PhantomData<(T, C)>,
1116}
1117
1118impl<T, C> Clone for PlexusExternalExternalReactant<T, C>
1119where
1120    C: Codec<T> + CodecName + Send + Sync + 'static,
1121    T: Send + Sync + 'static,
1122{
1123    fn clone(&self) -> Self {
1124        Self {
1125            current_ganglion_id: self.current_ganglion_id,
1126            external_ganglia: self.external_ganglia.clone(),
1127            reactions: self.reactions.clone(),
1128            _phantom: self._phantom,
1129        }
1130    }
1131}
1132
1133impl<T, C> PlexusExternalExternalReactant<T, C>
1134where
1135    C: Codec<T> + CodecName + Send + Sync + 'static,
1136    T: Send + Sync + 'static,
1137{
1138    #[allow(clippy::type_complexity)]
1139    pub fn new(
1140        current_ganglion_id: Uuid,
1141        external_ganglia: Arc<
1142            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
1143        >,
1144        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
1145    ) -> Self {
1146        Self {
1147            current_ganglion_id,
1148            external_ganglia,
1149            reactions,
1150            _phantom: PhantomData,
1151        }
1152    }
1153}
1154
1155impl<T, C> ReactantRaw<T, C> for PlexusExternalExternalReactant<T, C>
1156where
1157    C: Codec<T> + CodecName + Send + Sync + 'static,
1158    T: Send + Sync + 'static,
1159{
1160    fn react(
1161        &self,
1162        payload: Arc<PayloadRaw<T, C>>,
1163    ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
1164        let current_ganglion_id = self.current_ganglion_id;
1165        let external_ganglia = self.external_ganglia.clone();
1166        let reactions = self.reactions.clone();
1167
1168        let payload_clone = payload.clone();
1169        Box::pin(
1170            async move {
1171                tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
1172
1173                let reaction_set_arc = reactions
1174                    .get_with(payload_clone.span_id(), async {
1175                        Arc::new(Mutex::new(HashSet::new()))
1176                    })
1177                    .await;
1178
1179                let reaction_set_copy = {
1180                    let mut set = reaction_set_arc.lock().await;
1181                    if set.contains(&current_ganglion_id) {
1182                        tracing::debug!(
1183                        "PlexusExternalExternalReactant::react - Ganglion {current_ganglion_id} already processed reaction, returning early"
1184                    );
1185                        return Ok(());
1186                    }
1187                    set.insert(current_ganglion_id);
1188                    set.clone()
1189                };
1190
1191                let erased_payload = erase_payload_raw(payload_clone.clone());
1192
1193                let external_ganglia_guard = external_ganglia.read().await;
1194                let external_ganglia_to_process: Vec<_> = external_ganglia_guard
1195                    .iter()
1196                    .filter(|(ganglion_id, _ganglion)| {
1197                        *ganglion_id != &current_ganglion_id
1198                            && !reaction_set_copy.contains(ganglion_id)
1199                    })
1200                    .map(|(_ganglion_id, ganglion)| ganglion.clone())
1201                    .collect();
1202                drop(external_ganglia_guard);
1203
1204                let external_futures =
1205                    external_ganglia_to_process
1206                        .into_iter()
1207                        .map(|ganglion_mutex| {
1208                            let payload = erased_payload.clone();
1209                            Box::pin(async move {
1210                    let mut ganglion = ganglion_mutex.lock().await;
1211                    ganglion.transmit_encoded(payload.clone()).await
1212                })
1213                        });
1214
1215                let _results = join_all(external_futures).await;
1216                Ok(())
1217            }
1218            .instrument(payload.span_debug("PlexusExternalExternalReactant::react")),
1219        )
1220    }
1221
1222    fn erase_raw(self: Box<Self>) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
1223        erase_reactant_raw(self)
1224    }
1225}
1226
1227#[cfg(test)]
1228mod tests {
1229    use super::*;
1230    use crate::erasure::payload::{erase_payload, erase_payload_raw};
1231    use crate::erasure::reactant::{erase_reactant, erase_reactant_raw};
1232    use crate::logging::TraceContext;
1233    use crate::neuron::NeuronImpl;
1234    use crate::payload::{Payload, PayloadRaw};
1235    use crate::test_utils::{
1236        DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant,
1237        TokioMpscReactantRaw, test_namespace,
1238    };
1239    use tokio::sync::mpsc::channel;
1240    use uuid::Uuid;
1241
1242    #[test]
1243    fn test_cartesian_product_equivalence() {
1244        let neurons = [("neuron1", "Neuron1"), ("neuron2", "Neuron2")];
1245        let ganglia = [("ganglion1", "uuid1"), ("ganglion2", "uuid2")];
1246
1247        let mut old_combinations = HashSet::new();
1248        for (neuron_name, _neuron) in neurons.iter() {
1249            for (ganglion_id, _ganglion_mutex) in ganglia.iter() {
1250                old_combinations.insert((neuron_name.to_string(), ganglion_id.to_string()));
1251            }
1252        }
1253
1254        let new_combinations: HashSet<(String, String)> = neurons
1255            .iter()
1256            .cartesian_product(ganglia.iter())
1257            .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
1258                (neuron_name.to_string(), ganglion_id.to_string())
1259            })
1260            .collect();
1261
1262        assert_eq!(old_combinations, new_combinations);
1263        assert_eq!(old_combinations.len(), 4);
1264    }
1265
1266    #[tokio::test]
1267    async fn test_plexus_creation() {
1268        let plexus: Plexus = Plexus::new(vec![], vec![]).await;
1269
1270        assert_eq!(plexus.relevant_neurons.len(), 0);
1271        assert_eq!(plexus.ignored_neurons.len(), 0);
1272    }
1273
1274    #[tokio::test]
1275    async fn test_plexus_capable() {
1276        let ns = test_namespace();
1277        let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
1278
1279        let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1280        let neuron_arc = Arc::new(neuron);
1281
1282        assert!(plexus.capable(neuron_arc));
1283    }
1284
1285    #[tokio::test]
1286    async fn test_plexus_adapt() {
1287        let ns = test_namespace();
1288        let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
1289
1290        let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1291
1292        plexus
1293            .adapt(Arc::new(neuron))
1294            .await
1295            .expect("Failed to adapt neuron");
1296
1297        let neurons = plexus.neurons.read().await;
1298        assert_eq!(neurons.len(), 1);
1299    }
1300
1301    #[tokio::test]
1302    async fn test_plexus_ganglion_internal_adapt() {
1303        let ns = test_namespace();
1304        let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
1305
1306        let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1307        let neuron_arc = Arc::new(neuron);
1308
1309        assert!(plexus.capable(neuron_arc.clone()));
1310
1311        plexus
1312            .adapt(neuron_arc)
1313            .await
1314            .expect("Failed to adapt neuron");
1315
1316        let neurons = plexus.neurons.read().await;
1317        assert_eq!(neurons.len(), 1);
1318    }
1319
1320    #[tokio::test]
1321    async fn test_plexus_external_inprocess_transmit_via_adapt() {
1322        let ns = test_namespace();
1323
1324        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1325        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1326        let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
1327        let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
1328        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1329        let neuron_arc = neuron_impl.clone_to_arc();
1330
1331        let mut ganglion_inprocess = GanglionInprocess::new();
1332        let mut ganglion_external_inprocess = GanglionExternalInprocess::new();
1333
1334        let _ = ganglion_inprocess.adapt(neuron_arc.clone()).await;
1335        ganglion_external_inprocess
1336            .adapt(neuron_arc.clone())
1337            .await
1338            .expect("Failed to adapt neuron");
1339
1340        let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1341            erase_reactant(Box::new(TokioMpscReactant::new(tx1))),
1342            erase_reactant(Box::new(TokioMpscReactant::new(tx2))),
1343        ];
1344
1345        let erased_raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1346            erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx1))),
1347            erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx2))),
1348        ];
1349
1350        ganglion_inprocess
1351            .react(neuron_arc.name(), erased_reactants, vec![])
1352            .await
1353            .expect("Failed to react");
1354        ganglion_external_inprocess
1355            .react(neuron_arc.name(), vec![], erased_raw_reactants, vec![])
1356            .await
1357            .expect("Failed to react raw");
1358
1359        let test_payload = Payload::with_correlation(
1360            DebugStruct {
1361                foo: 42,
1362                bar: "test".to_string(),
1363            },
1364            neuron_arc.clone(),
1365            None,
1366        );
1367
1368        let test_payload_raw = PayloadRaw::with_correlation(
1369            DebugCodec::encode(&DebugStruct {
1370                foo: 42,
1371                bar: "test".to_string(),
1372            })
1373            .expect("Failed to encode test data"),
1374            neuron_arc.clone(),
1375            None,
1376        );
1377
1378        let erased_payload = erase_payload(test_payload);
1379        ganglion_inprocess
1380            .transmit(erased_payload)
1381            .await
1382            .expect("Failed to transmit");
1383
1384        let erased_payload_raw = erase_payload_raw(test_payload_raw);
1385        ganglion_external_inprocess
1386            .transmit_encoded(erased_payload_raw)
1387            .await
1388            .expect("Failed to transmit encoded");
1389
1390        let received1 = rx1.recv().await.expect("Failed to receive payload 1");
1391        let received2 = rx2.recv().await.expect("Failed to receive payload 2");
1392        let raw_received1 = raw_rx1
1393            .recv()
1394            .await
1395            .expect("Failed to receive raw payload 1");
1396        let raw_received2 = raw_rx2
1397            .recv()
1398            .await
1399            .expect("Failed to receive raw payload 2");
1400
1401        assert_eq!(received1.value.foo, 42);
1402        assert_eq!(received2.value.foo, 42);
1403        let decoded1 =
1404            DebugCodec::decode(&raw_received1.value).expect("Failed to decode raw payload 1");
1405        let decoded2 =
1406            DebugCodec::decode(&raw_received2.value).expect("Failed to decode raw payload 2");
1407        assert_eq!(decoded1.foo, 42);
1408        assert_eq!(decoded2.foo, 42);
1409    }
1410
1411    #[tokio::test]
1412    async fn test_plexus_external_inprocess_across_threads() {
1413        let ns = test_namespace();
1414
1415        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1416        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1417        let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
1418        let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
1419        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1420        let neuron_arc = neuron_impl.clone_to_arc();
1421
1422        let ganglion_inprocess = Arc::new(Mutex::new(GanglionInprocess::new()));
1423        let ganglion_external_inprocess = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1424
1425        {
1426            let mut ganglion = ganglion_inprocess.lock().await;
1427            let _ = ganglion.adapt(neuron_arc.clone()).await;
1428        }
1429        {
1430            let mut ganglion = ganglion_external_inprocess.lock().await;
1431            ganglion
1432                .adapt(neuron_arc.clone())
1433                .await
1434                .expect("Failed to adapt neuron");
1435        }
1436
1437        let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1438            erase_reactant(Box::new(TokioMpscReactant::new(tx1))),
1439            erase_reactant(Box::new(TokioMpscReactant::new(tx2))),
1440        ];
1441
1442        let erased_raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1443            erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx1))),
1444            erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx2))),
1445        ];
1446
1447        {
1448            let mut ganglion = ganglion_inprocess.lock().await;
1449            ganglion
1450                .react(neuron_arc.name(), erased_reactants, vec![])
1451                .await
1452                .expect("Failed to react");
1453        }
1454        {
1455            let mut ganglion = ganglion_external_inprocess.lock().await;
1456            ganglion
1457                .react(neuron_arc.name(), vec![], erased_raw_reactants, vec![])
1458                .await
1459                .expect("Failed to react raw");
1460        }
1461
1462        let ganglion_inprocess_clone = ganglion_inprocess.clone();
1463        let ganglion_external_inprocess_clone = ganglion_external_inprocess.clone();
1464
1465        let neuron_arc_clone = neuron_arc.clone();
1466        let task1 = tokio::task::spawn(async move {
1467            let test_payload = Payload::with_correlation(
1468                DebugStruct {
1469                    foo: 42,
1470                    bar: "test".to_string(),
1471                },
1472                neuron_arc_clone.clone(),
1473                None,
1474            );
1475
1476            let erased_payload = erase_payload(test_payload);
1477            let mut ganglion = ganglion_inprocess_clone.lock().await;
1478            ganglion
1479                .transmit(erased_payload)
1480                .await
1481                .expect("Failed to transmit");
1482        });
1483
1484        let neuron_arc_clone2 = neuron_arc.clone();
1485        let task2 = tokio::task::spawn(async move {
1486            let test_payload_raw = PayloadRaw::with_correlation(
1487                DebugCodec::encode(&DebugStruct {
1488                    foo: 42,
1489                    bar: "test".to_string(),
1490                })
1491                .expect("Failed to encode test data"),
1492                neuron_arc_clone2.clone(),
1493                None,
1494            );
1495
1496            let erased_payload_raw = erase_payload_raw(test_payload_raw);
1497            let mut ganglion = ganglion_external_inprocess_clone.lock().await;
1498            ganglion
1499                .transmit_encoded(erased_payload_raw)
1500                .await
1501                .expect("Failed to transmit encoded");
1502        });
1503
1504        task1.await.expect("Task 1 failed");
1505        task2.await.expect("Task 2 failed");
1506
1507        let received1 = rx1.recv().await.expect("Failed to receive payload 1");
1508        let received2 = rx2.recv().await.expect("Failed to receive payload 2");
1509        let raw_received1 = raw_rx1
1510            .recv()
1511            .await
1512            .expect("Failed to receive raw payload 1");
1513        let raw_received2 = raw_rx2
1514            .recv()
1515            .await
1516            .expect("Failed to receive raw payload 2");
1517
1518        assert_eq!(received1.value.foo, 42);
1519        assert_eq!(received2.value.foo, 42);
1520        let decoded1 =
1521            DebugCodec::decode(&raw_received1.value).expect("Failed to decode raw payload 1");
1522        let decoded2 =
1523            DebugCodec::decode(&raw_received2.value).expect("Failed to decode raw payload 2");
1524        assert_eq!(decoded1.foo, 42);
1525        assert_eq!(decoded2.foo, 42);
1526    }
1527
1528    #[tokio::test]
1529    async fn test_plexus_transmit_encoded_external_to_internal() {
1530        let ns = test_namespace();
1531        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1532        let neuron_arc = neuron_impl.clone_to_arc();
1533
1534        let (tx_internal, mut rx_internal) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1535
1536        let external_ganglion = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1537        let mut internal_plexus = Plexus::new(vec![erase_neuron(neuron_arc.clone())], vec![]).await;
1538
1539        {
1540            let mut g = external_ganglion.lock().await;
1541            g.adapt(neuron_arc.clone())
1542                .await
1543                .expect("Failed to adapt neuron to external ganglion");
1544        }
1545        internal_plexus
1546            .adapt(neuron_arc.clone())
1547            .await
1548            .expect("Failed to adapt neuron to internal plexus");
1549
1550        let reactants = vec![erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(
1551            TokioMpscReactant::new(tx_internal),
1552        ))];
1553        internal_plexus
1554            .react(neuron_arc.name(), reactants, vec![])
1555            .await
1556            .expect("Failed to add reactant to internal plexus");
1557
1558        let _ = internal_plexus
1559            .infuse_external_ganglion(external_ganglion.clone())
1560            .await;
1561
1562        let test_data = DebugStruct {
1563            foo: 123,
1564            bar: "plexus_transmit_encoded_test".to_string(),
1565        };
1566        let encoded_data = neuron_impl
1567            .encode(&test_data)
1568            .expect("Failed to encode test data");
1569        let correlation_id = Uuid::now_v7();
1570
1571        let span_id = Uuid::now_v7().as_u128() as u64;
1572        let payload_raw = Arc::new(PayloadRaw {
1573            value: Arc::new(encoded_data),
1574            neuron: neuron_arc.clone(),
1575            trace: TraceContext::from_parts(correlation_id, span_id, None),
1576        });
1577
1578        {
1579            let mut g = external_ganglion.lock().await;
1580            let erased_payload_raw = erase_payload_raw(payload_raw.clone());
1581            g.transmit_encoded(erased_payload_raw)
1582                .await
1583                .expect("Failed to transmit encoded payload");
1584        }
1585
1586        tokio::time::sleep(Duration::from_millis(100)).await;
1587
1588        assert_eq!(
1589            rx_internal.len(),
1590            1,
1591            "Internal plexus should have received exactly one payload"
1592        );
1593        let received_payload = rx_internal
1594            .recv()
1595            .await
1596            .expect("Should receive decoded payload in internal plexus");
1597
1598        assert_eq!(
1599            received_payload.value.foo, test_data.foo,
1600            "Decoded payload should have correct foo value"
1601        );
1602        assert_eq!(
1603            received_payload.value.bar, test_data.bar,
1604            "Decoded payload should have correct bar value"
1605        );
1606        assert_eq!(
1607            received_payload.correlation_id(), correlation_id,
1608            "Correlation ID should match"
1609        );
1610    }
1611
1612    #[tokio::test]
1613    async fn test_plexus_ganglion_excision() {
1614        let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
1615
1616        let internal_ganglion_1 = Arc::new(Mutex::new(GanglionInprocess::new()));
1617        let internal_id_1 = { internal_ganglion_1.lock().await.unique_id() };
1618
1619        plexus
1620            .infuse_ganglion(internal_ganglion_1.clone())
1621            .await
1622            .expect("Failed to infuse internal ganglion 1");
1623
1624        {
1625            let internal_ganglia = plexus.internal_ganglia.read().await;
1626            assert!(internal_ganglia.contains_key(&internal_id_1));
1627        }
1628
1629        let excised_internal_1 = plexus
1630            .excise_ganglion_by_id(internal_id_1)
1631            .await
1632            .expect("Failed to excise internal ganglion 1 by ID");
1633        assert!(excised_internal_1.is_some());
1634        assert_eq!(
1635            excised_internal_1.unwrap().lock().await.unique_id(),
1636            internal_id_1
1637        );
1638
1639        {
1640            let internal_ganglia = plexus.internal_ganglia.read().await;
1641            assert!(!internal_ganglia.contains_key(&internal_id_1));
1642        }
1643
1644        let internal_ganglion_2 = Arc::new(Mutex::new(GanglionInprocess::new()));
1645        let internal_id_2 = { internal_ganglion_2.lock().await.unique_id() };
1646
1647        plexus
1648            .infuse_ganglion(internal_ganglion_2.clone())
1649            .await
1650            .expect("Failed to infuse internal ganglion 2");
1651
1652        let excised_internal_2 = plexus
1653            .excise_ganglion(internal_ganglion_2.clone())
1654            .await
1655            .expect("Failed to excise internal ganglion 2 by instance");
1656        assert!(excised_internal_2.is_some());
1657        assert_eq!(
1658            excised_internal_2.unwrap().lock().await.unique_id(),
1659            internal_id_2
1660        );
1661
1662        {
1663            let internal_ganglia = plexus.internal_ganglia.read().await;
1664            assert!(!internal_ganglia.contains_key(&internal_id_2));
1665        }
1666
1667        let external_ganglion_1 = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1668        let external_id_1 = { external_ganglion_1.lock().await.unique_id() };
1669
1670        plexus
1671            .infuse_external_ganglion(external_ganglion_1.clone())
1672            .await
1673            .expect("Failed to infuse external ganglion 1");
1674
1675        {
1676            let external_ganglia = plexus.external_ganglia.read().await;
1677            assert!(external_ganglia.contains_key(&external_id_1));
1678        }
1679
1680        let excised_external_1 = plexus
1681            .excise_external_ganglion_by_id(external_id_1)
1682            .await
1683            .expect("Failed to excise external ganglion 1 by ID");
1684        assert!(excised_external_1.is_some());
1685        assert_eq!(
1686            excised_external_1.unwrap().lock().await.unique_id(),
1687            external_id_1
1688        );
1689
1690        {
1691            let external_ganglia = plexus.external_ganglia.read().await;
1692            assert!(!external_ganglia.contains_key(&external_id_1));
1693        }
1694
1695        let external_ganglion_2 = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1696        let external_id_2 = { external_ganglion_2.lock().await.unique_id() };
1697
1698        plexus
1699            .infuse_external_ganglion(external_ganglion_2.clone())
1700            .await
1701            .expect("Failed to infuse external ganglion 2");
1702
1703        let excised_external_2 = plexus
1704            .excise_external_ganglion(external_ganglion_2.clone())
1705            .await
1706            .expect("Failed to excise external ganglion 2 by instance");
1707        assert!(excised_external_2.is_some());
1708        assert_eq!(
1709            excised_external_2.unwrap().lock().await.unique_id(),
1710            external_id_2
1711        );
1712
1713        {
1714            let external_ganglia = plexus.external_ganglia.read().await;
1715            assert!(!external_ganglia.contains_key(&external_id_2));
1716        }
1717    }
1718}
1719