Skip to main content

plexor_core/plexus/
mod.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7pub mod reactants;
8
9use crate::axon::AxonError;
10use crate::codec::{Codec, CodecName};
11use crate::erasure::neuron::{NeuronErased, erase_neuron};
12use crate::erasure::payload::PayloadErased;
13use crate::erasure::reactant::{ErrorReactantErased, ReactantErased, ReactantRawErased};
14use crate::ganglion::{
15    Ganglion, GanglionError, GanglionExternal, GanglionInprocess, GanglionInternal,
16};
17use crate::neuron::Neuron;
18use crate::plexus::reactants::{
19    ErasedExternalInternalReactantFactory, PlexusExternalExternalReactantFactory,
20    PlexusExternalInternalReactantFactory, PlexusInternalReactantFactory,
21};
22use itertools::Itertools;
23use moka::future::Cache;
24use reactants::{ErasedExternalExternalReactantFactory, ErasedInternalReactantFactory};
25use std::collections::{HashMap, HashSet};
26use std::future::Future;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::time::Duration;
30use thiserror::Error;
31use tokio::sync::{Mutex, RwLock};
32use uuid::Uuid;
33
34#[derive(Error, Debug)]
35pub enum PlexusError {
36    #[error("Ganglion error: {0}")]
37    Ganglion(#[from] GanglionError),
38    #[error("Failed to acquire lock on external ganglia")]
39    ExternalGangliaLock,
40    #[error("Failed to acquire lock on internal ganglia")]
41    InternalGangliaLock,
42    #[error("Failed to acquire lock on neurons")]
43    NeuronsLock,
44    #[error("Failed to acquire lock on reactant factories")]
45    ReactantFactoriesLock,
46    #[error("Failed to acquire lock on neuron ganglia")]
47    NeuronGangliaLock,
48    #[error("Failed to acquire lock on reactions")]
49    ReactionsLock,
50    #[error("Neuron adaptation failed for {neuron_name}")]
51    NeuronAdaptation { neuron_name: String },
52    #[error("Reactant creation failed for {neuron_name}")]
53    ReactantCreation { neuron_name: String },
54    #[error("Transmission failed")]
55    Transmission,
56}
57
58impl From<AxonError> for PlexusError {
59    fn from(error: AxonError) -> Self {
60        match error {
61            AxonError::GanglionError(e) => PlexusError::Ganglion(e),
62            AxonError::NeuronNotAdapted {
63                neuron_name,
64                ganglion_name,
65                ganglion_id,
66            } => PlexusError::Ganglion(GanglionError::SynapseNotFound {
67                neuron_name,
68                ganglion_name,
69                ganglion_id,
70            }),
71            AxonError::SynapseLock {
72                neuron_name,
73                ganglion_name,
74                ganglion_id,
75            } => PlexusError::Ganglion(GanglionError::SynapseLock {
76                neuron_name,
77                ganglion_name,
78                ganglion_id,
79            }),
80            AxonError::Transmit {
81                neuron_name,
82                ganglion_name,
83                ganglion_id,
84                message,
85            } => PlexusError::Ganglion(GanglionError::Transmit {
86                neuron_name,
87                ganglion_name,
88                ganglion_id,
89                message,
90            }),
91            AxonError::Encode {
92                neuron_name,
93                ganglion_name,
94                ganglion_id,
95            } => PlexusError::Ganglion(GanglionError::Encode {
96                neuron_name,
97                ganglion_name,
98                ganglion_id,
99            }),
100            AxonError::Decode {
101                neuron_name,
102                ganglion_name,
103                ganglion_id,
104            } => PlexusError::Ganglion(GanglionError::Decode {
105                neuron_name,
106                ganglion_name,
107                ganglion_id,
108            }),
109            AxonError::TransmissionTimeout => PlexusError::Transmission,
110        }
111    }
112}
113
114pub struct PlexusReactantFactories {
115    pub internal_factory: Arc<dyn ErasedInternalReactantFactory + Send + Sync>,
116    pub external_internal_factory: Arc<dyn ErasedExternalInternalReactantFactory + Send + Sync>,
117    pub external_external_factory: Arc<dyn ErasedExternalExternalReactantFactory + Send + Sync>,
118}
119
120/// A Plexus manages a network of Ganglia, arranging automatic transmission of types between different Ganglia.
121/// This is the Rust equivalent of the Python Plexus class.
122#[allow(clippy::type_complexity)]
123pub struct Plexus {
124    /// Unique identifier for this plexus instance
125    id: Uuid,
126
127    /// In-process ganglion for handling local neurons
128    inproc_ganglion: Arc<Mutex<GanglionInprocess>>,
129
130    /// External ganglia that handle encoded payloads, mapped by their unique_id
131    external_ganglia:
132        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
133
134    /// Internal ganglia that handle decoded payloads, mapped by their unique_id
135    internal_ganglia:
136        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
137
138    /// Set of neurons managed by this plexus, mapped by their name
139    neurons: Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
140
141    /// Mapping of neuron strongly-typed reactant factories
142    reactant_factories: Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
143
144    /// Mapping of neuron ganglion pairs
145    neuron_ganglia: Arc<RwLock<HashSet<(String, Uuid)>>>,
146
147    /// Reactions tracking for preventing loops - maps span_id to a set of ganglia IDs it has processed.
148    /// Uses a Cache with TTL to automatically expire old reactions.
149    reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
150
151    /// Neurons that this plexus will handle (if empty, handles all)
152    relevant_neurons: HashSet<String>,
153
154    /// Neurons that this plexus will ignore
155    ignored_neurons: HashSet<String>,
156}
157impl Plexus {
158    /// Returns a read guard to the neurons map
159    pub async fn neurons(
160        &self,
161    ) -> tokio::sync::RwLockReadGuard<
162        '_,
163        HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>,
164    > {
165        self.neurons.read().await
166    }
167
168    /// Create a new Plexus with the given ganglia and neuron filters
169    pub async fn new(
170        relevant_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
171        ignored_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
172    ) -> Self {
173        let inproc_ganglion = Arc::new(Mutex::new(GanglionInprocess::new()));
174
175        // Convert Vec of neurons to HashSet of neuron names
176        let relevant_neuron_names = relevant_neurons
177            .iter()
178            .map(|n| n.name())
179            .collect::<HashSet<String>>();
180
181        let ignored_neuron_names = ignored_neurons
182            .iter()
183            .map(|n| n.name())
184            .collect::<HashSet<String>>();
185
186        let reactions = Cache::builder()
187            .time_to_idle(Duration::from_secs(60)) // Default TTL 60s
188            .build();
189
190        let plexus = Self {
191            id: Uuid::now_v7(),
192            inproc_ganglion: inproc_ganglion.clone(),
193            external_ganglia: Arc::new(RwLock::new(HashMap::new())),
194            internal_ganglia: Arc::new(RwLock::new(HashMap::new())),
195            neurons: Arc::new(RwLock::new(HashMap::new())),
196            reactant_factories: Arc::new(RwLock::new(HashMap::new())),
197            neuron_ganglia: Arc::new(RwLock::new(HashSet::new())),
198            reactions,
199            relevant_neurons: relevant_neuron_names,
200            ignored_neurons: ignored_neuron_names,
201        };
202
203        // Add the inproc_ganglion to internal_ganglia
204        let ganglion_id = {
205            let ganglion_guard = inproc_ganglion.lock().await;
206            ganglion_guard.unique_id()
207        };
208        plexus
209            .internal_ganglia
210            .write()
211            .await
212            .insert(ganglion_id, inproc_ganglion);
213
214        plexus
215    }
216
217    /// Helper to create an Arc<Mutex<Plexus>>
218    pub async fn new_shared(
219        relevant_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
220        ignored_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
221    ) -> Arc<Mutex<Self>> {
222        Arc::new(Mutex::new(
223            Self::new(relevant_neurons, ignored_neurons).await,
224        ))
225    }
226
227    /// Add a ganglion to the plexus
228    pub async fn infuse_ganglion<G>(&mut self, ganglion: Arc<Mutex<G>>) -> Result<(), PlexusError>
229    where
230        G: GanglionInternal + Ganglion + Send + Sync + 'static,
231    {
232        // Get the unique_id of the ganglion
233        let ganglion_id = {
234            let ganglion_guard = ganglion.lock().await;
235            ganglion_guard.unique_id()
236        };
237
238        self.internal_ganglia
239            .write()
240            .await
241            .insert(ganglion_id, ganglion.clone());
242
243        self.update_neuron_ganglia().await?;
244        Ok(())
245    }
246
247    /// Add an external ganglion to the plexus
248    pub async fn infuse_external_ganglion<G>(
249        &mut self,
250        ganglion: Arc<Mutex<G>>,
251    ) -> Result<(), PlexusError>
252    where
253        G: GanglionExternal + Send + Sync + 'static,
254    {
255        // Get the unique_id of the ganglion
256        let ganglion_id = {
257            let ganglion_guard = ganglion.lock().await;
258            ganglion_guard.unique_id()
259        };
260
261        self.external_ganglia
262            .write()
263            .await
264            .insert(ganglion_id, ganglion);
265
266        self.update_neuron_ganglia().await?;
267        Ok(())
268    }
269
270    /// Remove a ganglion from the plexus by its ID
271    pub async fn excise_ganglion_by_id(
272        &mut self,
273        ganglion_id: Uuid,
274    ) -> Result<Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>, PlexusError> {
275        let removed = self.internal_ganglia.write().await.remove(&ganglion_id);
276
277        if removed.is_some() {
278            self.update_neuron_ganglia().await?;
279        }
280
281        Ok(removed)
282    }
283
284    /// Remove a ganglion from the plexus
285    pub async fn excise_ganglion<G>(
286        &mut self,
287        ganglion: Arc<Mutex<G>>,
288    ) -> Result<Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>, PlexusError>
289    where
290        G: GanglionInternal + Send + Sync + ?Sized + 'static,
291    {
292        let ganglion_id = {
293            let guard = ganglion.lock().await;
294            guard.unique_id()
295        };
296        self.excise_ganglion_by_id(ganglion_id).await
297    }
298
299    /// Remove an external ganglion from the plexus by its ID
300    pub async fn excise_external_ganglion_by_id(
301        &mut self,
302        ganglion_id: Uuid,
303    ) -> Result<Option<Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>, PlexusError> {
304        let removed = self.external_ganglia.write().await.remove(&ganglion_id);
305
306        if removed.is_some() {
307            self.update_neuron_ganglia().await?;
308        }
309
310        Ok(removed)
311    }
312
313    /// Remove an external ganglion from the plexus
314    pub async fn excise_external_ganglion<G>(
315        &mut self,
316        ganglion: Arc<Mutex<G>>,
317    ) -> Result<Option<Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>, PlexusError>
318    where
319        G: GanglionExternal + Send + Sync + ?Sized + 'static,
320    {
321        let ganglion_id = {
322            let guard = ganglion.lock().await;
323            guard.unique_id()
324        };
325        self.excise_external_ganglion_by_id(ganglion_id).await
326    }
327
328    /// Update neurons with a new neuron
329    pub async fn update_neurons(
330        &self,
331        neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
332    ) -> Result<(), PlexusError> {
333        let mut neurons = self.neurons.write().await;
334        neurons.insert(neuron.name(), neuron);
335        drop(neurons);
336
337        // Update neuron-ganglion mappings
338        self.update_neuron_ganglia().await?;
339        Ok(())
340    }
341
342    /// Update neuron-ganglion mappings for all neurons and ganglia
343    async fn update_neuron_ganglia(&self) -> Result<(), PlexusError> {
344        update_neuron_ganglia_internal(
345            &self.neurons,
346            &self.internal_ganglia,
347            &self.external_ganglia,
348            &self.neuron_ganglia,
349            &self.reactant_factories,
350            &self.reactions,
351        )
352        .await
353    }
354}
355
356impl Ganglion for Plexus {
357    fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
358    where
359        C: Codec<T> + CodecName + Send + Sync + 'static,
360        T: Send + Sync + 'static,
361    {
362        let neuron_name = neuron.name();
363
364        // If relevant_neurons is not empty, check if this neuron is in it
365        if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
366            return false;
367        }
368
369        // If this neuron is in ignored_neurons, return false
370        if self.ignored_neurons.contains(&neuron_name) {
371            return false;
372        }
373
374        true
375    }
376
377    fn adapt<T, C>(
378        &mut self,
379
380        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
381    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
382    where
383        C: Codec<T> + CodecName + Send + Sync + 'static,
384        T: Send + Sync + 'static,
385    {
386        tracing::debug!(neuron = %neuron.name(), "Plexus::adapt - Adapting neuron");
387
388        // Check if we can handle this neuron
389
390        if !self.capable(neuron.clone()) {
391            return Box::pin(async move { Ok(()) });
392        }
393
394        let inproc_ganglion = self.inproc_ganglion.clone();
395
396        let erased_neuron = erase_neuron(neuron.clone());
397
398        let reactions = self.reactions.clone();
399
400        let reactant_factories = self.reactant_factories.clone();
401
402        let neurons = self.neurons.clone();
403
404        let id = self.id;
405
406        let internal_ganglia = self.internal_ganglia.clone();
407
408        let external_ganglia = self.external_ganglia.clone();
409
410        let neuron_ganglia = self.neuron_ganglia.clone();
411
412        // We can't easily get a payload here to create a span, and this is a configuration step.
413        // We will just use a standard debug log. Adaptation is generally low-frequency.
414        Box::pin(async move {
415            let future = {
416                let mut inproc_ganglion_guard = inproc_ganglion.lock().await;
417
418                inproc_ganglion_guard.adapt(neuron.clone())
419            };
420
421            let result = future.await;
422
423            let factories = PlexusReactantFactories {
424                internal_factory: Arc::new(PlexusInternalReactantFactory::<T, C>::new()),
425
426                external_internal_factory: Arc::new(
427                    PlexusExternalInternalReactantFactory::<T, C>::new(),
428                ),
429
430                external_external_factory: Arc::new(
431                    PlexusExternalExternalReactantFactory::<T, C>::new(),
432                ),
433            };
434
435            reactant_factories
436                .write()
437                .await
438                .insert(neuron.name(), factories);
439
440            // Update the neurons set
441
442            if let Err(plexus_error) = update_neurons_internal(
443                &neurons,
444                &internal_ganglia,
445                &external_ganglia,
446                &neuron_ganglia,
447                &reactant_factories,
448                &reactions,
449                erased_neuron,
450            )
451            .await
452            {
453                // Convert PlexusError to GanglionError and return it
454
455                return Err(GanglionError::from_plexus_error(
456                    plexus_error,
457                    neuron.name(),
458                    "Plexus".to_string(),
459                    id,
460                ));
461            }
462
463            result
464        })
465    }
466}
467
468#[allow(clippy::type_complexity)]
469async fn update_neurons_internal(
470    neurons_map: &Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
471
472    internal_ganglia: &Arc<
473        RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
474    >,
475
476    external_ganglia: &Arc<
477        RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
478    >,
479
480    neuron_ganglia: &Arc<RwLock<HashSet<(String, Uuid)>>>,
481
482    reactant_factories: &Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
483
484    reactions: &Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
485
486    neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
487) -> Result<(), PlexusError> {
488    let mut neurons = neurons_map.write().await;
489
490    neurons.insert(neuron.name(), neuron);
491
492    drop(neurons);
493
494    // Update neuron-ganglion mappings
495
496    update_neuron_ganglia_internal(
497        neurons_map,
498        internal_ganglia,
499        external_ganglia,
500        neuron_ganglia,
501        reactant_factories,
502        reactions,
503    )
504    .await?;
505
506    Ok(())
507}
508
509#[allow(clippy::type_complexity)]
510async fn update_neuron_ganglia_internal(
511    neurons_map: &Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
512
513    internal_ganglia_map: &Arc<
514        RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
515    >,
516
517    external_ganglia_map: &Arc<
518        RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
519    >,
520
521    neuron_ganglia_set: &Arc<RwLock<HashSet<(String, Uuid)>>>,
522
523    reactant_factories_map: &Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
524
525    reactions: &Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
526) -> Result<(), PlexusError> {
527    let neurons = neurons_map.read().await;
528
529    let internal_ganglia = internal_ganglia_map.read().await;
530
531    let external_ganglia = external_ganglia_map.read().await;
532
533    // Create all combinations of neurons and internal ganglia using cartesian product
534
535    let all_internal_neuron_ganglia: HashSet<(String, Uuid)> = neurons
536        .iter()
537        .cartesian_product(internal_ganglia.iter())
538        .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
539            (neuron_name.clone(), *ganglion_id)
540        })
541        .collect();
542
543    // Create all combinations of neurons and external ganglia using cartesian product
544
545    let all_external_neuron_ganglia: HashSet<(String, Uuid)> = neurons
546        .iter()
547        .cartesian_product(external_ganglia.iter())
548        .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
549            (neuron_name.clone(), *ganglion_id)
550        })
551        .collect();
552
553    let mut neuron_ganglia = neuron_ganglia_set.write().await;
554
555    let new_internal_combinations: Vec<(String, Uuid)> = all_internal_neuron_ganglia
556        .difference(&neuron_ganglia)
557        .cloned()
558        .collect();
559
560    let new_external_combinations: Vec<(String, Uuid)> = all_external_neuron_ganglia
561        .difference(&neuron_ganglia)
562        .cloned()
563        .collect();
564
565    *neuron_ganglia = all_internal_neuron_ganglia
566        .union(&all_external_neuron_ganglia)
567        .cloned()
568        .collect();
569
570    drop(neuron_ganglia);
571
572    // Process new internal combinations - group by ganglion_id
573
574    let mut internal_reactants_by_ganglion: HashMap<
575        (String, Uuid),
576        Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
577    > = HashMap::new();
578
579    for (neuron_name, ganglion_id) in new_internal_combinations {
580        let internal_reactant = reactant_factories_map
581            .read()
582            .await
583            .get(&neuron_name)
584            .ok_or_else(|| PlexusError::ReactantCreation {
585                neuron_name: neuron_name.clone(),
586            })?
587            .internal_factory
588            .create_reactant(
589                ganglion_id,
590                internal_ganglia_map.clone(),
591                external_ganglia_map.clone(),
592                reactions.clone(),
593            );
594
595        internal_reactants_by_ganglion
596            .entry((neuron_name, ganglion_id))
597            .or_default()
598            .push(internal_reactant);
599    }
600
601    // Call react once per ganglion with all collected reactants
602
603    for ((neuron_name, ganglion_id), reactants) in internal_reactants_by_ganglion {
604        if let Some(ganglion_mutex) = internal_ganglia.get(&ganglion_id) {
605            let mut ganglion = ganglion_mutex.lock().await;
606
607            // Call react once per neuron with all collected reactants for that neuron
608
609            ganglion.react(neuron_name, reactants, vec![]).await?;
610        }
611    }
612
613    // Process new external combinations - group by ganglion_id
614
615    #[allow(clippy::type_complexity)]
616    let mut external_reactants_by_ganglion: HashMap<
617        (String, Uuid),
618        (
619            Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
620            Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
621        ),
622    > = HashMap::new();
623
624    for (neuron_name, ganglion_id) in new_external_combinations {
625        let external_internal_reactant = reactant_factories_map
626            .read()
627            .await
628            .get(&neuron_name)
629            .ok_or_else(|| PlexusError::ReactantCreation {
630                neuron_name: neuron_name.clone(),
631            })?
632            .external_internal_factory
633            .create_reactant(ganglion_id, internal_ganglia_map.clone(), reactions.clone());
634
635        let external_external_reactant = reactant_factories_map
636            .read()
637            .await
638            .get(&neuron_name)
639            .ok_or_else(|| PlexusError::ReactantCreation {
640                neuron_name: neuron_name.clone(),
641            })?
642            .external_external_factory
643            .create_reactant(ganglion_id, external_ganglia_map.clone(), reactions.clone());
644
645        let entry = external_reactants_by_ganglion
646            .entry((neuron_name.clone(), ganglion_id))
647            .or_default();
648
649        entry.0.push(external_internal_reactant);
650
651        entry.1.push(external_external_reactant);
652    }
653
654    // Call react once per ganglion with all collected reactants
655
656    for ((neuron_name, ganglion_id), (reactants, reactants_raw)) in external_reactants_by_ganglion {
657        if let Some(ganglion_mutex) = external_ganglia.get(&ganglion_id) {
658            let mut ganglion = ganglion_mutex.lock().await;
659
660            ganglion
661                .react(neuron_name, reactants, reactants_raw, vec![])
662                .await?;
663        }
664    }
665
666    Ok(())
667}
668
669impl GanglionInternal for Plexus {
670    fn transmit(
671        &mut self,
672
673        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
674    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
675        let inproc_ganglion = self.inproc_ganglion.clone();
676
677        Box::pin(async move {
678            let future = {
679                let mut ganglion = inproc_ganglion.lock().await;
680
681                ganglion.transmit(payload)
682            };
683
684            future.await
685        })
686    }
687
688    fn react(
689        &mut self,
690        neuron_name: String,
691        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
692        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
693    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
694        let inproc_ganglion = self.inproc_ganglion.clone();
695
696        Box::pin(async move {
697            let future = {
698                let mut ganglion = inproc_ganglion.lock().await;
699
700                ganglion.react(neuron_name, reactants, error_reactants)
701            };
702
703            future.await
704        })
705    }
706
707    fn unique_id(&self) -> Uuid {
708        self.id
709    }
710}
711
712#[cfg(test)]
713mod tests {
714    use super::*;
715    use crate::erasure::payload::{erase_payload, erase_payload_raw};
716    use crate::erasure::reactant::{erase_reactant, erase_reactant_raw};
717    use crate::logging::TraceContext;
718    use crate::neuron::NeuronImpl;
719    use crate::payload::{Payload, PayloadRaw};
720    use crate::test_utils::{
721        DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant,
722        TokioMpscReactantRaw, test_namespace,
723    };
724    use tokio::sync::mpsc::channel;
725    use uuid::Uuid;
726
727    #[test]
728    fn test_cartesian_product_equivalence() {
729        // Test that cartesian product produces same results as nested loops
730        let neurons = [("neuron1", "Neuron1"), ("neuron2", "Neuron2")];
731        let ganglia = [("ganglion1", "uuid1"), ("ganglion2", "uuid2")];
732
733        // Old approach with nested loops
734        let mut old_combinations = HashSet::new();
735        for (neuron_name, _neuron) in neurons.iter() {
736            for (ganglion_id, _ganglion_mutex) in ganglia.iter() {
737                old_combinations.insert((neuron_name.to_string(), ganglion_id.to_string()));
738            }
739        }
740
741        // New approach with cartesian product
742        let new_combinations: HashSet<(String, String)> = neurons
743            .iter()
744            .cartesian_product(ganglia.iter())
745            .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
746                (neuron_name.to_string(), ganglion_id.to_string())
747            })
748            .collect();
749
750        // Verify they produce the same results
751        assert_eq!(old_combinations, new_combinations);
752        assert_eq!(old_combinations.len(), 4); // 2 neurons × 2 ganglia = 4 combinations
753    }
754
755    #[tokio::test]
756    async fn test_plexus_creation() {
757        let plexus: Plexus = Plexus::new(vec![], vec![]).await;
758
759        assert_eq!(plexus.relevant_neurons.len(), 0);
760        assert_eq!(plexus.ignored_neurons.len(), 0);
761    }
762
763    #[tokio::test]
764    async fn test_plexus_capable() {
765        let ns = test_namespace();
766        let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
767
768        let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
769        let neuron_arc = Arc::new(neuron);
770
771        assert!(plexus.capable(neuron_arc));
772    }
773
774    #[tokio::test]
775    async fn test_plexus_adapt() {
776        let ns = test_namespace();
777        let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
778
779        let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
780
781        plexus
782            .adapt(Arc::new(neuron))
783            .await
784            .expect("Failed to adapt neuron");
785
786        let neurons = plexus.neurons.read().await;
787        assert_eq!(neurons.len(), 1);
788    }
789
790    #[tokio::test]
791    async fn test_plexus_ganglion_internal_adapt() {
792        let ns = test_namespace();
793        let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
794
795        let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
796        let neuron_arc = Arc::new(neuron);
797
798        // Check if plexus can handle this neuron
799        assert!(plexus.capable(neuron_arc.clone()));
800
801        // Test the Ganglion::adapt implementation
802        plexus
803            .adapt(neuron_arc)
804            .await
805            .expect("Failed to adapt neuron");
806
807        let neurons = plexus.neurons.read().await;
808        assert_eq!(neurons.len(), 1);
809    }
810
811    #[tokio::test]
812    async fn test_plexus_external_inprocess_transmit_via_adapt() {
813        let ns = test_namespace();
814
815        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
816        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
817        let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
818        let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
819        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
820        let neuron_arc = neuron_impl.clone_to_arc();
821
822        // Create GanglionInprocess and GanglionExternalInprocess
823        let mut ganglion_inprocess = GanglionInprocess::new();
824        let mut ganglion_external_inprocess = GanglionExternalInprocess::new();
825
826        // Adapt both ganglia
827        let _ = ganglion_inprocess.adapt(neuron_arc.clone()).await;
828        ganglion_external_inprocess
829            .adapt(neuron_arc.clone())
830            .await
831            .expect("Failed to adapt neuron");
832
833        // Convert reactants to erased types and react
834        let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
835            erase_reactant(Box::new(TokioMpscReactant::new(tx1))),
836            erase_reactant(Box::new(TokioMpscReactant::new(tx2))),
837        ];
838
839        let erased_raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
840            erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx1))),
841            erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx2))),
842        ];
843
844        // React with both ganglia
845        ganglion_inprocess
846            .react(neuron_arc.name(), erased_reactants, vec![])
847            .await
848            .expect("Failed to react");
849        ganglion_external_inprocess
850            .react(neuron_arc.name(), vec![], erased_raw_reactants, vec![])
851            .await
852            .expect("Failed to react raw");
853
854        // Create test payload
855        let test_payload = Payload::with_correlation(
856            DebugStruct {
857                foo: 42,
858                bar: "test".to_string(),
859            },
860            neuron_arc.clone(),
861            None,
862        );
863
864        let test_payload_raw = PayloadRaw::with_correlation(
865            DebugCodec::encode(&DebugStruct {
866                foo: 42,
867                bar: "test".to_string(),
868            })
869            .expect("Failed to encode test data"),
870            Some(neuron_arc.clone()),
871            None,
872        );
873
874        // Transmit via ganglion_inprocess
875        let erased_payload = erase_payload(test_payload);
876        ganglion_inprocess
877            .transmit(erased_payload)
878            .await
879            .expect("Failed to transmit");
880
881        // Transmit via ganglion_external_inprocess
882        let erased_payload_raw = erase_payload_raw(test_payload_raw);
883        ganglion_external_inprocess
884            .transmit_encoded(erased_payload_raw)
885            .await
886            .expect("Failed to transmit encoded");
887
888        // Verify that payloads were received
889        let received1 = rx1.recv().await.expect("Failed to receive payload 1");
890        let received2 = rx2.recv().await.expect("Failed to receive payload 2");
891        let raw_received1 = raw_rx1
892            .recv()
893            .await
894            .expect("Failed to receive raw payload 1");
895        let raw_received2 = raw_rx2
896            .recv()
897            .await
898            .expect("Failed to receive raw payload 2");
899
900        assert_eq!(received1.value.foo, 42);
901        assert_eq!(received2.value.foo, 42);
902        // For raw payloads, we need to decode the bytes first
903        let decoded1 =
904            DebugCodec::decode(&raw_received1.value).expect("Failed to decode raw payload 1");
905        let decoded2 =
906            DebugCodec::decode(&raw_received2.value).expect("Failed to decode raw payload 2");
907        assert_eq!(decoded1.foo, 42);
908        assert_eq!(decoded2.foo, 42);
909    }
910
911    #[tokio::test]
912    async fn test_plexus_external_inprocess_across_threads() {
913        let ns = test_namespace();
914
915        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
916        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
917        let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
918        let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
919        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
920        let neuron_arc = neuron_impl.clone_to_arc();
921
922        // Create GanglionInprocess and GanglionExternalInprocess
923        let ganglion_inprocess = Arc::new(Mutex::new(GanglionInprocess::new()));
924        let ganglion_external_inprocess = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
925
926        // Adapt both ganglia
927        {
928            let mut ganglion = ganglion_inprocess.lock().await;
929            let _ = ganglion.adapt(neuron_arc.clone()).await;
930        }
931        {
932            let mut ganglion = ganglion_external_inprocess.lock().await;
933            ganglion
934                .adapt(neuron_arc.clone())
935                .await
936                .expect("Failed to adapt neuron");
937        }
938
939        // Convert reactants to erased types and react
940        let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
941            erase_reactant(Box::new(TokioMpscReactant::new(tx1))),
942            erase_reactant(Box::new(TokioMpscReactant::new(tx2))),
943        ];
944
945        let erased_raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
946            erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx1))),
947            erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx2))),
948        ];
949
950        // React with both ganglia
951        {
952            let mut ganglion = ganglion_inprocess.lock().await;
953            ganglion
954                .react(neuron_arc.name(), erased_reactants, vec![])
955                .await
956                .expect("Failed to react");
957        }
958        {
959            let mut ganglion = ganglion_external_inprocess.lock().await;
960            ganglion
961                .react(neuron_arc.name(), vec![], erased_raw_reactants, vec![])
962                .await
963                .expect("Failed to react raw");
964        }
965
966        // Spawn tasks to transmit payloads
967        let ganglion_inprocess_clone = ganglion_inprocess.clone();
968        let ganglion_external_inprocess_clone = ganglion_external_inprocess.clone();
969
970        let neuron_arc_clone = neuron_arc.clone();
971        let task1 = tokio::task::spawn(async move {
972            let test_payload = Payload::with_correlation(
973                DebugStruct {
974                    foo: 42,
975                    bar: "test".to_string(),
976                },
977                neuron_arc_clone.clone(),
978                None,
979            );
980
981            let erased_payload = erase_payload(test_payload);
982            let mut ganglion = ganglion_inprocess_clone.lock().await;
983            ganglion
984                .transmit(erased_payload)
985                .await
986                .expect("Failed to transmit");
987        });
988
989        let neuron_arc_clone2 = neuron_arc.clone();
990        let task2 = tokio::task::spawn(async move {
991            let test_payload_raw = PayloadRaw::with_correlation(
992                DebugCodec::encode(&DebugStruct {
993                    foo: 42,
994                    bar: "test".to_string(),
995                })
996                .expect("Failed to encode test data"),
997                Some(neuron_arc_clone2.clone()),
998                None,
999            );
1000
1001            let erased_payload_raw = erase_payload_raw(test_payload_raw);
1002            let mut ganglion = ganglion_external_inprocess_clone.lock().await;
1003            ganglion
1004                .transmit_encoded(erased_payload_raw)
1005                .await
1006                .expect("Failed to transmit encoded");
1007        });
1008
1009        // Wait for tasks to complete
1010        task1.await.expect("Task 1 failed");
1011        task2.await.expect("Task 2 failed");
1012
1013        // Verify that payloads were received
1014        let received1 = rx1.recv().await.expect("Failed to receive payload 1");
1015        let received2 = rx2.recv().await.expect("Failed to receive payload 2");
1016        let raw_received1 = raw_rx1
1017            .recv()
1018            .await
1019            .expect("Failed to receive raw payload 1");
1020        let raw_received2 = raw_rx2
1021            .recv()
1022            .await
1023            .expect("Failed to receive raw payload 2");
1024
1025        assert_eq!(received1.value.foo, 42);
1026        assert_eq!(received2.value.foo, 42);
1027        // For raw payloads, we need to decode the bytes first
1028        let decoded1 =
1029            DebugCodec::decode(&raw_received1.value).expect("Failed to decode raw payload 1");
1030        let decoded2 =
1031            DebugCodec::decode(&raw_received2.value).expect("Failed to decode raw payload 2");
1032        assert_eq!(decoded1.foo, 42);
1033        assert_eq!(decoded2.foo, 42);
1034    }
1035
1036    #[tokio::test]
1037    async fn test_plexus_transmit_encoded_external_to_internal() {
1038        use crate::erasure::neuron::erase_neuron;
1039        use crate::erasure::payload::erase_payload_raw;
1040        use crate::erasure::reactant::erase_reactant;
1041        use crate::neuron::NeuronImpl;
1042        use crate::payload::PayloadRaw;
1043        use crate::test_utils::{
1044            DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant, test_namespace,
1045        };
1046        use tokio::sync::mpsc::channel;
1047        use tokio::time::{Duration, sleep};
1048
1049        println!("[DEBUG] TEST - Starting test_plexus_transmit_encoded_external_to_internal");
1050
1051        let ns = test_namespace();
1052        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1053        let neuron_arc = neuron_impl.clone_to_arc();
1054
1055        // Create channel to receive payload at internal plexus
1056        let (tx_internal, mut rx_internal) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1057
1058        // Create external ganglion and internal plexus
1059        let external_ganglion = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1060        let mut internal_plexus = Plexus::new(vec![erase_neuron(neuron_arc.clone())], vec![]).await;
1061
1062        // Adapt both with the neuron
1063        {
1064            let mut g = external_ganglion.lock().await;
1065            g.adapt(neuron_arc.clone())
1066                .await
1067                .expect("Failed to adapt neuron to external ganglion");
1068        }
1069        internal_plexus
1070            .adapt(neuron_arc.clone())
1071            .await
1072            .expect("Failed to adapt neuron to internal plexus");
1073
1074        // Add a reactant to the internal plexus to capture received payloads
1075        let reactants = vec![erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(
1076            TokioMpscReactant::new(tx_internal),
1077        ))];
1078        internal_plexus
1079            .react(neuron_arc.name(), reactants, vec![])
1080            .await
1081            .expect("Failed to add reactant to internal plexus");
1082
1083        // Infuse the external ganglion into the plexus
1084        let _ = internal_plexus
1085            .infuse_external_ganglion(external_ganglion.clone())
1086            .await;
1087
1088        // Create test data and encode it
1089        let test_data = DebugStruct {
1090            foo: 123,
1091            bar: "plexus_transmit_encoded_test".to_string(),
1092        };
1093        let encoded_data = neuron_impl
1094            .encode(&test_data)
1095            .expect("Failed to encode test data");
1096        let correlation_id = Uuid::now_v7();
1097
1098        // Create PayloadRaw with encoded data
1099        let span_id = Uuid::now_v7().as_u128() as u64;
1100        let payload_raw = Arc::new(PayloadRaw {
1101            value: Arc::new(encoded_data),
1102            neuron: Some(neuron_arc.clone()),
1103            trace: TraceContext::from_parts(correlation_id, span_id, None),
1104        });
1105
1106        // Use transmit_encoded on the external ganglion
1107        println!("[DEBUG] TEST - About to call transmit_encoded on external ganglion");
1108        {
1109            let mut g = external_ganglion.lock().await;
1110            let erased_payload_raw = erase_payload_raw(payload_raw.clone());
1111            g.transmit_encoded(erased_payload_raw)
1112                .await
1113                .expect("Failed to transmit encoded payload");
1114        }
1115        println!("[DEBUG] TEST - Completed transmit_encoded call");
1116
1117        // Give some time for async transmission to complete
1118        sleep(Duration::from_millis(100)).await;
1119
1120        // Verify that the internal plexus received the decoded payload
1121        assert_eq!(
1122            rx_internal.len(),
1123            1,
1124            "Internal plexus should have received exactly one payload"
1125        );
1126        let received_payload = rx_internal
1127            .recv()
1128            .await
1129            .expect("Should receive decoded payload in internal plexus");
1130
1131        // Verify the payload was correctly decoded
1132        assert_eq!(
1133            received_payload.value.foo, test_data.foo,
1134            "Decoded payload should have correct foo value"
1135        );
1136        assert_eq!(
1137            received_payload.value.bar, test_data.bar,
1138            "Decoded payload should have correct bar value"
1139        );
1140        assert_eq!(
1141            received_payload.correlation_id(), correlation_id,
1142            "Correlation ID should match"
1143        );
1144
1145        println!(
1146            "[DEBUG] TEST - Successfully verified that PayloadRaw from transmit_encoded was received as decoded Payload in Plexus"
1147        );
1148    }
1149
1150    #[tokio::test]
1151    async fn test_plexus_ganglion_excision() {
1152        let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
1153
1154        // Test internal ganglion excision (by ID)
1155        let internal_ganglion_1 = Arc::new(Mutex::new(GanglionInprocess::new()));
1156        let internal_id_1 = { internal_ganglion_1.lock().await.unique_id() };
1157
1158        plexus
1159            .infuse_ganglion(internal_ganglion_1.clone())
1160            .await
1161            .expect("Failed to infuse internal ganglion 1");
1162
1163        {
1164            let internal_ganglia = plexus.internal_ganglia.read().await;
1165            assert!(internal_ganglia.contains_key(&internal_id_1));
1166        }
1167
1168        let excised_internal_1 = plexus
1169            .excise_ganglion_by_id(internal_id_1)
1170            .await
1171            .expect("Failed to excise internal ganglion 1 by ID");
1172        assert!(excised_internal_1.is_some());
1173        assert_eq!(
1174            excised_internal_1.unwrap().lock().await.unique_id(),
1175            internal_id_1
1176        );
1177
1178        {
1179            let internal_ganglia = plexus.internal_ganglia.read().await;
1180            assert!(!internal_ganglia.contains_key(&internal_id_1));
1181        }
1182
1183        // Test internal ganglion excision (by instance)
1184        let internal_ganglion_2 = Arc::new(Mutex::new(GanglionInprocess::new()));
1185        let internal_id_2 = { internal_ganglion_2.lock().await.unique_id() };
1186
1187        plexus
1188            .infuse_ganglion(internal_ganglion_2.clone())
1189            .await
1190            .expect("Failed to infuse internal ganglion 2");
1191
1192        let excised_internal_2 = plexus
1193            .excise_ganglion(internal_ganglion_2.clone())
1194            .await
1195            .expect("Failed to excise internal ganglion 2 by instance");
1196        assert!(excised_internal_2.is_some());
1197        assert_eq!(
1198            excised_internal_2.unwrap().lock().await.unique_id(),
1199            internal_id_2
1200        );
1201
1202        {
1203            let internal_ganglia = plexus.internal_ganglia.read().await;
1204            assert!(!internal_ganglia.contains_key(&internal_id_2));
1205        }
1206
1207        // Test external ganglion excision (by ID)
1208        let external_ganglion_1 = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1209        let external_id_1 = { external_ganglion_1.lock().await.unique_id() };
1210
1211        plexus
1212            .infuse_external_ganglion(external_ganglion_1.clone())
1213            .await
1214            .expect("Failed to infuse external ganglion 1");
1215
1216        {
1217            let external_ganglia = plexus.external_ganglia.read().await;
1218            assert!(external_ganglia.contains_key(&external_id_1));
1219        }
1220
1221        let excised_external_1 = plexus
1222            .excise_external_ganglion_by_id(external_id_1)
1223            .await
1224            .expect("Failed to excise external ganglion 1 by ID");
1225        assert!(excised_external_1.is_some());
1226        assert_eq!(
1227            excised_external_1.unwrap().lock().await.unique_id(),
1228            external_id_1
1229        );
1230
1231        {
1232            let external_ganglia = plexus.external_ganglia.read().await;
1233            assert!(!external_ganglia.contains_key(&external_id_1));
1234        }
1235
1236        // Test external ganglion excision (by instance)
1237        let external_ganglion_2 = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1238        let external_id_2 = { external_ganglion_2.lock().await.unique_id() };
1239
1240        plexus
1241            .infuse_external_ganglion(external_ganglion_2.clone())
1242            .await
1243            .expect("Failed to infuse external ganglion 2");
1244
1245        let excised_external_2 = plexus
1246            .excise_external_ganglion(external_ganglion_2.clone())
1247            .await
1248            .expect("Failed to excise external ganglion 2 by instance");
1249        assert!(excised_external_2.is_some());
1250        assert_eq!(
1251            excised_external_2.unwrap().lock().await.unique_id(),
1252            external_id_2
1253        );
1254
1255        {
1256            let external_ganglia = plexus.external_ganglia.read().await;
1257            assert!(!external_ganglia.contains_key(&external_id_2));
1258        }
1259    }
1260
1261    #[tokio::test]
1262    async fn test_weak_key_hash_map_reaction_cleanup() {
1263        // This test is obsolete as we switched to Moka Cache and removed reaction_id
1264    }
1265}