Skip to main content

plexor_core/
ganglion.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::codec::{Codec, CodecName};
8use crate::erasure::payload::{PayloadErased, PayloadRawErased};
9use crate::erasure::reactant::{ErrorReactantErased, ReactantErased, ReactantRawErased};
10use crate::erasure::synapse::{SynapseInternalErased, erase_synapse_internal};
11use crate::logging::LogTrace;
12use crate::neuron::{Neuron, NeuronError};
13use crate::reactant::Reactant;
14use crate::synapse::SynapseInprocess;
15use crate::utils::struct_name_of_type;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::{Arc, RwLock};
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::Instrument;
24use uuid::Uuid;
25
26#[derive(Error, Debug)]
27pub enum GanglionError {
28    #[error("Synapse not found: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
29    SynapseNotFound {
30        neuron_name: String,
31        ganglion_name: String,
32        ganglion_id: Uuid,
33    },
34    #[error(
35        "Failed to acquire lock on synapse: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
36    )]
37    SynapseLock {
38        neuron_name: String,
39        ganglion_name: String,
40        ganglion_id: Uuid,
41    },
42    #[error(
43        "Transmission error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id}): {message}"
44    )]
45    Transmit {
46        neuron_name: String,
47        ganglion_name: String,
48        ganglion_id: Uuid,
49        message: String,
50    },
51    #[error("Encode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
52    Encode {
53        neuron_name: String,
54        ganglion_name: String,
55        ganglion_id: Uuid,
56    },
57    #[error("Decode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
58    Decode {
59        neuron_name: String,
60        ganglion_name: String,
61        ganglion_id: Uuid,
62    },
63    #[error(
64        "Adaptation error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
65    )]
66    Adapt {
67        neuron_name: String,
68        ganglion_name: String,
69        ganglion_id: Uuid,
70    },
71    #[error("Queue full for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
72    QueueFull {
73        neuron_name: String,
74        ganglion_name: String,
75        ganglion_id: Uuid,
76    },
77}
78
79impl GanglionError {
80    /// Convert a NeuronError to a GanglionError with additional context
81    pub fn from_neuron_error(
82        neuron_error: NeuronError,
83        ganglion_name: String,
84        ganglion_id: Uuid,
85    ) -> Self {
86        match neuron_error {
87            NeuronError::Encode { neuron_name, .. } => GanglionError::Encode {
88                neuron_name,
89                ganglion_name,
90                ganglion_id,
91            },
92            NeuronError::Decode { neuron_name, .. } => GanglionError::Decode {
93                neuron_name,
94                ganglion_name,
95                ganglion_id,
96            },
97        }
98    }
99
100    /// Convert a PlexusError to a GanglionError with additional context
101    pub fn from_plexus_error(
102        plexus_error: crate::plexus::PlexusError,
103        neuron_name: String,
104        ganglion_name: String,
105        ganglion_id: Uuid,
106    ) -> Self {
107        match plexus_error {
108            // If the PlexusError already contains a GanglionError, return it as-is
109            crate::plexus::PlexusError::Ganglion(ganglion_error) => ganglion_error,
110            // For all other PlexusError variants, convert to Adapt error
111            _ => GanglionError::Adapt {
112                neuron_name,
113                ganglion_name,
114                ganglion_id,
115            },
116        }
117    }
118}
119
120pub trait Ganglion {
121    fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
122    where
123        C: Codec<T> + CodecName + Send + Sync + 'static,
124        T: Send + Sync + 'static;
125
126    fn adapt<T, C>(
127        &mut self,
128        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
129    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
130    where
131        C: Codec<T> + CodecName + Send + Sync + 'static,
132        T: Send + Sync + 'static;
133}
134
135pub trait GanglionInternal {
136    fn transmit(
137        &mut self,
138        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
139    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>>;
140
141    fn react(
142        &mut self,
143        neuron_name: String,
144        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
145        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
146    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>;
147
148    fn unique_id(&self) -> Uuid;
149}
150
151pub trait GanglionExternal {
152    fn transmit(
153        &mut self,
154        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
155    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>>;
156
157    #[allow(clippy::type_complexity)]
158    fn transmit_encoded(
159        &mut self,
160        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
161    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), GanglionError>> + Send + 'static>>;
162
163    fn react(
164        &mut self,
165        neuron_name: String,
166        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
167        raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
168        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
169    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>;
170
171    fn unique_id(&self) -> Uuid;
172}
173
174/// Helper function to adapt a single neuron to multiple ganglia at once.
175/// Note: All ganglia in the slice must be of the same type G.
176/// For heterogeneous ganglia, use the `adapt_all!` macro instead.
177pub async fn adapt_all<T, C, G>(
178    ganglia: &[Arc<Mutex<G>>],
179    neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
180) -> Result<(), GanglionError>
181where
182    C: Codec<T> + CodecName + Send + Sync + 'static,
183    T: Send + Sync + 'static,
184    G: Ganglion + ?Sized,
185{
186    for ganglion_mutex in ganglia {
187        let mut ganglion = ganglion_mutex.lock().await;
188        ganglion.adapt(neuron.clone()).await?;
189    }
190    Ok(())
191}
192
193/// Macro to adapt a single neuron to multiple heterogeneous ganglia at once.
194///
195/// # Examples
196///
197/// ```ignore
198/// adapt_all!(neuron, plexus_arc, transport_arc).await?;
199/// ```
200#[macro_export]
201macro_rules! adapt_all {
202    ($neuron:expr, $($ganglion:expr),+ $(,)?) => {
203        async {
204            $(
205                $ganglion.lock().await.adapt($neuron.clone()).await?;
206            )+
207            Result::<(), $crate::ganglion::GanglionError>::Ok(())
208        }
209    };
210}
211
212pub struct GanglionInprocess {
213    id: Uuid,
214    synapses_by_name:
215        HashMap<String, Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>>,
216    /// Neurons that this ganglion will handle (if empty, handles all)
217    relevant_neurons: HashSet<String>,
218    /// Neurons that this ganglion will ignore
219    ignored_neurons: HashSet<String>,
220}
221
222impl GanglionInprocess {
223    pub fn new() -> Self {
224        Self {
225            id: Uuid::now_v7(),
226            synapses_by_name: HashMap::new(),
227            relevant_neurons: HashSet::new(),
228            ignored_neurons: HashSet::new(),
229        }
230    }
231
232    /// Helper to create an Arc<Mutex<GanglionInprocess>>
233    pub fn new_shared() -> Arc<Mutex<Self>> {
234        Arc::new(Mutex::new(Self::new()))
235    }
236
237    pub fn new_with_filters(
238        relevant_neurons: HashSet<String>,
239        ignored_neurons: HashSet<String>,
240    ) -> Self {
241        Self {
242            id: Uuid::now_v7(),
243            synapses_by_name: HashMap::new(),
244            relevant_neurons,
245            ignored_neurons,
246        }
247    }
248
249    pub fn create_synapse<T, C>(
250        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
251        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
252        error_reactants: Vec<Arc<dyn crate::reactant::ErrorReactant<T, C> + Send + Sync>>,
253    ) -> SynapseInprocess<T, C>
254    where
255        T: Send + Sync + 'static,
256        C: Codec<T> + CodecName + Send + Sync + 'static,
257        SynapseInprocess<T, C>: Send + Sync + 'static,
258    {
259        SynapseInprocess::<T, C>::new(neuron, reactants, error_reactants)
260    }
261
262    fn get_synapse_by_name(
263        &self,
264        name: &str,
265    ) -> Option<Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>> {
266        self.synapses_by_name.get(name).cloned()
267    }
268}
269
270impl Default for GanglionInprocess {
271    fn default() -> Self {
272        Self::new()
273    }
274}
275
276impl Ganglion for GanglionInprocess {
277    fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
278    where
279        C: Codec<T> + CodecName + Send + Sync + 'static,
280        T: Send + Sync + 'static,
281    {
282        let neuron_name = neuron.name();
283
284        if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
285            return false;
286        }
287
288        if !self.ignored_neurons.is_empty() && self.ignored_neurons.contains(&neuron_name) {
289            return false;
290        }
291
292        true
293    }
294
295    fn adapt<T, C>(
296        &mut self,
297        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
298    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
299    where
300        C: Codec<T> + CodecName + Send + Sync + 'static,
301        T: Send + Sync + 'static,
302    {
303        // Call capable and return early if false
304        if !self.capable(neuron.clone()) {
305            return Box::pin(async move {
306                Ok(()) // Not an error, just not capable
307            });
308        }
309
310        let neuron_name = neuron.name();
311
312        // Check if the synapse already exists
313        if self.synapses_by_name.contains_key(&neuron_name) {
314            return Box::pin(async move {
315                Ok(()) // Not an error, synapse already exists
316            });
317        }
318
319        // Create the synapse
320        let synapse = Self::create_synapse(neuron.clone(), vec![], vec![]);
321        let erased_synapse = erase_synapse_internal(synapse);
322
323        // Insert the synapse
324        self.synapses_by_name
325            .insert(neuron_name.clone(), erased_synapse);
326
327        Box::pin(async move { Ok(()) })
328    }
329}
330
331impl GanglionInternal for GanglionInprocess {
332    fn transmit(
333        &mut self,
334        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
335    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
336        let neuron_name = payload.get_neuron_name();
337        tracing::debug!("GanglionInprocess::transmit called for neuron: {neuron_name}");
338
339        if let Some(synapse_lock) = self.get_synapse_by_name(&neuron_name) {
340            tracing::debug!("GanglionInprocess::transmit found synapse, acquiring read lock");
341            match synapse_lock.read() {
342                Ok(synapse_guard) => {
343                    tracing::debug!(
344                        "GanglionInprocess::transmit acquired read lock, calling transmit_erased"
345                    );
346                    let future = synapse_guard.transmit_erased(payload.clone());
347                    let ganglion_id = self.id;
348                    let ganglion_name = struct_name_of_type::<Self>().to_string();
349                    Box::pin(
350                        async move {
351                            tracing::debug!(
352                                "GanglionInprocess::transmit awaiting transmit_erased future"
353                            );
354                            let result = future.await;
355                            tracing::debug!("GanglionInprocess::transmit transmit_erased completed");
356                            result.map_err(|e| match e {
357                                crate::synapse::SynapseError::QueueFull { neuron_name: _ } => {
358                                    GanglionError::QueueFull {
359                                        neuron_name: neuron_name.clone(),
360                                        ganglion_name: ganglion_name.clone(),
361                                        ganglion_id,
362                                    }
363                                }
364                                _ => GanglionError::Transmit {
365                                    neuron_name: neuron_name.clone(),
366                                    ganglion_name: ganglion_name.clone(),
367                                    ganglion_id,
368                                    message: e.to_string(),
369                                },
370                            })
371                        }
372                        .instrument(payload.span_debug("GanglionInprocess::transmit")),
373                    )
374                }
375                Err(_) => {
376                    tracing::debug!("GanglionInprocess::transmit failed to acquire read lock");
377                    let ganglion_id = self.id;
378                    let ganglion_name = struct_name_of_type::<Self>().to_string();
379                    Box::pin(async move {
380                        Err(GanglionError::SynapseLock {
381                            neuron_name,
382                            ganglion_name,
383                            ganglion_id,
384                        })
385                    })
386                }
387            }
388        } else {
389            tracing::debug!("GanglionInprocess::transmit synapse not found");
390            let ganglion_id = self.id;
391            let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
392            Box::pin(async move {
393                Err(GanglionError::SynapseNotFound {
394                    neuron_name,
395                    ganglion_name,
396                    ganglion_id,
397                })
398            })
399        }
400    }
401
402    fn react(
403        &mut self,
404        neuron_name: String,
405        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
406        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
407    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
408        // Get the synapse by name
409        let synapse_lock_opt = self.get_synapse_by_name(&neuron_name);
410
411        // Check if the synapse exists
412        if synapse_lock_opt.is_none() {
413            let ganglion_id = self.id;
414            let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
415            return Box::pin(async move {
416                Err(GanglionError::SynapseNotFound {
417                    neuron_name,
418                    ganglion_name,
419                    ganglion_id,
420                })
421            });
422        }
423
424        // Get a write lock on the synapse
425        let synapse_lock = synapse_lock_opt.unwrap();
426        let mut synapse_guard = match synapse_lock.write() {
427            Ok(guard) => guard,
428            Err(_) => {
429                let ganglion_id = self.id;
430                let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
431                return Box::pin(async move {
432                    Err(GanglionError::SynapseLock {
433                        neuron_name,
434                        ganglion_name,
435                        ganglion_id,
436                    })
437                });
438            }
439        };
440
441        // Call react_erased on the synapse with the reactants
442        synapse_guard.react_erased(reactants, error_reactants);
443
444        // The write lock is automatically released when synapse_guard goes out of scope
445
446        Box::pin(async move { Ok(()) })
447    }
448
449    fn unique_id(&self) -> Uuid {
450        self.id
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use crate::erasure::payload::{erase_payload, erase_payload_raw};
458    use crate::erasure::reactant::{
459        ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
460    };
461    use crate::namespace::NamespaceImpl;
462    use crate::neuron::{Neuron, NeuronImpl};
463    use crate::payload::{Payload, PayloadRaw};
464    use crate::test_utils::{
465        DebugCodec, DebugStruct, GanglionExternalInprocess, PingCodec, PingMsg, PingNeuron,
466        TokioMpscReactant, TokioMpscReactantGeneric, TokioMpscReactantRaw, test_namespace,
467    };
468    use std::collections::HashSet;
469    use std::sync::Arc;
470    use std::time::Duration;
471    use tokio::sync::mpsc::channel;
472    use tokio::task;
473    use tokio::time::sleep;
474    use uuid::Uuid;
475
476    #[test]
477    fn test_ganglion_error_with_ganglion_name() {
478        let ganglion_id = Uuid::now_v7();
479        let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
480
481        // Test SynapseNotFound error
482        let synapse_not_found = GanglionError::SynapseNotFound {
483            neuron_name: "test_neuron".to_string(),
484            ganglion_name: ganglion_name.clone(),
485            ganglion_id,
486        };
487        assert!(synapse_not_found.to_string().contains("test_neuron"));
488        assert!(synapse_not_found.to_string().contains("GanglionInprocess"));
489
490        // Test SynapseLockError error
491        let synapse_lock_error = GanglionError::SynapseLock {
492            neuron_name: "test_neuron".to_string(),
493            ganglion_name: ganglion_name.clone(),
494            ganglion_id,
495        };
496        assert!(synapse_lock_error.to_string().contains("test_neuron"));
497        assert!(synapse_lock_error.to_string().contains("GanglionInprocess"));
498
499        // Test new Transmit error
500        let transmit_error = GanglionError::Transmit {
501            neuron_name: "test_neuron".to_string(),
502            ganglion_name: ganglion_name.clone(),
503            ganglion_id,
504            message: "test failure".to_string(),
505        };
506        assert!(transmit_error.to_string().contains("test_neuron"));
507        assert!(transmit_error.to_string().contains("GanglionInprocess"));
508        assert!(transmit_error.to_string().contains("test failure"));
509    }
510    // test_ganglion_inprocess_neuron_by_name has been removed as part of removing get_neuron_by_name and neurons_by_name
511
512    #[tokio::test]
513    async fn test_ganglion_inprocess_get_synapse_by_name() {
514        let ns = test_namespace();
515        let neuron_impl_instance: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
516        let neuron_name_str = neuron_impl_instance.name();
517
518        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
519            Arc::new(neuron_impl_instance);
520
521        let mut ganglion = GanglionInprocess::new();
522
523        // Use adapt instead of add_neuron and populate_synapse
524        ganglion
525            .adapt(neuron.clone())
526            .await
527            .expect("Failed to adapt neuron for test");
528
529        let result_by_name1 = ganglion.get_synapse_by_name(&neuron_name_str);
530        assert!(result_by_name1.is_some());
531        let synapse_by_name1 = result_by_name1.unwrap();
532
533        let result_by_name2 = ganglion.get_synapse_by_name(&neuron_name_str);
534        assert!(result_by_name2.is_some());
535        let synapse_by_name2 = result_by_name2.unwrap();
536
537        assert!(
538            Arc::ptr_eq(&synapse_by_name1, &synapse_by_name2),
539            "Repeated calls to get_synapse_by_name should yield the same Arc instance."
540        );
541
542        let non_existent_result = ganglion.get_synapse_by_name("non_existent_neuron");
543        assert!(non_existent_result.is_none());
544    }
545
546    #[tokio::test]
547    async fn test_ganglion_inprocess_transmit_via_adapt() {
548        let ns = test_namespace();
549
550        let (tx1, mut rx1) = channel::<Arc<Payload<PingMsg, PingCodec>>>(10);
551        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
552
553        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
554
555        let neuron_arc = neuron_impl.clone_to_arc();
556
557        let mut ganglion: GanglionInprocess = GanglionInprocess::new();
558
559        // Use adapt instead of add_neuron and populate_synapse
560        ganglion
561            .adapt(neuron_arc.clone())
562            .await
563            .expect("Failed to adapt neuron");
564
565        let ping_neuron = Arc::new(PingNeuron::new(test_namespace()));
566        ganglion
567            .adapt(ping_neuron.clone())
568            .await
569            .expect("Failed to adapt ping neuron");
570
571        let reactants1: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> =
572            vec![erase_reactant::<PingMsg, PingCodec, _>(Box::new(
573                TokioMpscReactantGeneric::new(tx1.clone()),
574            ))];
575        let reactants2: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> =
576            vec![erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(
577                TokioMpscReactantGeneric::new(tx2.clone()),
578            ))];
579
580        ganglion
581            .react(ping_neuron.name(), reactants1, vec![])
582            .await
583            .expect("Failed to react ping");
584        ganglion
585            .react(neuron_arc.name(), reactants2, vec![])
586            .await
587            .expect("Failed to react debug");
588
589        let correlation_uuid1 = Uuid::now_v7();
590        let payload1 = Payload::builder()
591            .value(PingMsg { seq: 1 })
592            .correlation_id(correlation_uuid1)
593            .neuron(ping_neuron)
594            .build()
595            .unwrap();
596
597        let erased_payload1 = erase_payload(payload1);
598        ganglion
599            .transmit(erased_payload1)
600            .await
601            .expect("Failed to transmit payload1");
602
603        assert_eq!(
604            rx1.len(),
605            1,
606            "Reactant 1 should have received the first message (ping)"
607        );
608        let received_p1_ch1 = rx1.recv().await.unwrap();
609        assert_eq!(
610            received_p1_ch1.value.seq, 1,
611            "Payload value mismatch for reactant 1"
612        );
613        assert_eq!(
614            received_p1_ch1.correlation_id(), correlation_uuid1,
615            "Correlation ID mismatch for reactant 1"
616        );
617
618        let debug_struct_arc_2 = Arc::new(DebugStruct {
619            foo: 456,
620            bar: "ganglion_test_payload_2".to_string(),
621        });
622        let correlation_uuid2 = Uuid::now_v7();
623        let payload2 = Payload::builder()
624            .value((*debug_struct_arc_2).clone())
625            .correlation_id(correlation_uuid2)
626            .neuron(neuron_arc.clone())
627            .build()
628            .unwrap();
629
630        let erased_payload2 = erase_payload(payload2);
631        ganglion
632            .transmit(erased_payload2)
633            .await
634            .expect("Failed to transmit payload2");
635
636        assert_eq!(
637            rx2.len(),
638            1,
639            "Reactant 2 should have received the second message (debug)"
640        );
641        let received_p2_ch1 = rx2.recv().await.unwrap();
642        assert_eq!(
643            received_p2_ch1.value, debug_struct_arc_2,
644            "Second payload value mismatch for reactant 2"
645        );
646        assert_eq!(
647            received_p2_ch1.correlation_id(), correlation_uuid2,
648            "Second correlation ID mismatch for reactant 2"
649        );
650
651        // Ensure channels are empty now, indicating no unexpected messages
652        assert_eq!(
653            rx1.len(),
654            0,
655            "Reactant 1 channel should be empty after all expected messages"
656        );
657        assert_eq!(
658            rx2.len(),
659            0,
660            "Reactant 2 channel should be empty after all expected messages"
661        );
662    }
663
664    #[tokio::test]
665    async fn test_ganglion_inprocess_across_threads() {
666        // Create a struct to hold the shared state
667        struct SharedState {
668            // Channels to receive payloads from reactants
669            tx1: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
670            tx2: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
671            // Counter for received payloads
672            received_count: std::sync::atomic::AtomicUsize,
673        }
674
675        // Create channels with large buffer to avoid blocking
676        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
677        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
678
679        // Create shared state
680        let shared_state = Arc::new(SharedState {
681            tx1,
682            tx2,
683            received_count: std::sync::atomic::AtomicUsize::new(0),
684        });
685
686        // Number of threads and payloads per thread
687        let num_threads = 10;
688        let payloads_per_thread = 10;
689        let total_payloads = num_threads * payloads_per_thread;
690
691        // Create a vector to store all task handles
692        let mut handles = Vec::new();
693
694        // Spawn a task to receive payloads and count them
695        let receiver_state = shared_state.clone();
696        let receiver_handle = task::spawn(async move {
697            let mut received_payloads = Vec::new();
698
699            // Collect payloads from both channels
700            for _ in 0..total_payloads * 2 {
701                tokio::select! {
702                    Some(payload) = rx1.recv() => {
703                        received_payloads.push(payload);
704                        receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
705                    }
706                    Some(payload) = rx2.recv() => {
707                        received_payloads.push(payload);
708                        receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
709                    }
710                }
711            }
712
713            received_payloads
714        });
715
716        // Create shared namespace, neuron, and ganglion outside the tasks
717        let ns = test_namespace();
718        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
719        let neuron = neuron_impl.clone_to_arc();
720
721        let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
722            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
723                sender: shared_state.tx1.clone(),
724            })),
725            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
726                sender: shared_state.tx2.clone(),
727            })),
728        ];
729
730        let mut shared_ganglion = GanglionInprocess::new();
731        // Use adapt instead of add_neuron and populate_synapse
732        shared_ganglion
733            .adapt(neuron.clone())
734            .await
735            .expect("Failed to adapt neuron");
736        shared_ganglion
737            .react(neuron.name(), reactants, vec![])
738            .await
739            .expect("Failed to react");
740
741        let shared_ganglion = Arc::new(tokio::sync::Mutex::new(shared_ganglion));
742
743        // Spawn multiple tasks that will transmit payloads using the shared ganglion
744        for thread_id in 0..num_threads {
745            let ganglion = shared_ganglion.clone();
746            let neuron_clone = neuron.clone();
747
748            // Spawn a new task
749            let handle = task::spawn(async move {
750                // Transmit payloads through this task's ganglion
751                for i in 0..payloads_per_thread {
752                    // Create a unique payload for this thread and iteration
753                    let payload_id = thread_id * payloads_per_thread + i;
754                    let debug_struct = Arc::new(DebugStruct {
755                        foo: payload_id as i32,
756                        bar: format!("thread_{thread_id}_payload_{i}"),
757                    });
758
759                    let correlation_uuid = Uuid::now_v7();
760                    let payload = Payload::builder()
761                        .value((*debug_struct).clone())
762                        .correlation_id(correlation_uuid)
763                        .neuron(neuron_clone.clone())
764                        .build()
765                        .unwrap();
766
767                    // Add a small delay to increase the chance of thread interleaving
768                    sleep(Duration::from_millis(1)).await;
769
770                    // Transmit the payload through the shared ganglion
771                    let mut ganglion_guard = ganglion.lock().await;
772                    let erased_payload = erase_payload(payload);
773                    let _ = ganglion_guard.transmit(erased_payload).await;
774                }
775            });
776
777            handles.push(handle);
778        }
779
780        // Wait for all transmitter tasks to complete
781        for handle in handles {
782            handle.await.unwrap();
783        }
784
785        // Wait for the receiver task to complete and get the received payloads
786        let received_payloads = receiver_handle.await.unwrap();
787
788        // Verify that we received the expected number of payloads
789        assert_eq!(
790            shared_state
791                .received_count
792                .load(std::sync::atomic::Ordering::SeqCst),
793            total_payloads * 2,
794            "Should have received all payloads on both reactants"
795        );
796
797        // Verify that we received payloads from all threads
798        let mut foo_values = received_payloads
799            .iter()
800            .map(|p| p.value.foo)
801            .collect::<Vec<_>>();
802        foo_values.sort();
803        foo_values.dedup();
804
805        assert_eq!(
806            foo_values.len(),
807            total_payloads,
808            "Should have received payloads with all expected foo values"
809        );
810
811        // Check that the foo values match the expected range
812        for i in 0..total_payloads {
813            assert!(
814                foo_values.contains(&(i as i32)),
815                "Should have received a payload with foo={i}"
816            );
817        }
818
819        // Verify that all correlation_ids are preserved
820        let mut correlation_ids = received_payloads
821            .iter()
822            .map(|p| p.correlation_id())
823            .collect::<Vec<_>>();
824
825        // Each correlation_id should appear exactly twice (once from each reactant)
826        // So we should have total_payloads unique correlation_ids
827        correlation_ids.sort();
828
829        // Count occurrences of each correlation_id
830        let mut correlation_id_counts = std::collections::HashMap::new();
831        for id in &correlation_ids {
832            *correlation_id_counts.entry(*id).or_insert(0) += 1;
833        }
834
835        // Verify we have the expected number of unique correlation_ids
836        assert_eq!(
837            correlation_id_counts.len(),
838            total_payloads,
839            "Should have received payloads with all expected correlation_ids"
840        );
841
842        // Verify each correlation_id appears exactly twice (once from each reactant)
843        for (id, count) in correlation_id_counts {
844            assert_eq!(
845                count, 2,
846                "Correlation ID {id} should appear exactly twice (once from each reactant)"
847            );
848        }
849    }
850
851    #[tokio::test]
852    async fn test_ganglion_external_unique_id() {
853        use crate::test_utils::GanglionExternalInprocess;
854        let ganglion1 = GanglionExternalInprocess::new();
855        let ganglion2 = GanglionExternalInprocess::new();
856
857        // Each ganglion should have a unique ID
858        assert_ne!(ganglion1.unique_id(), ganglion2.unique_id());
859
860        // The same ganglion should return the same ID consistently
861        assert_eq!(ganglion1.unique_id(), ganglion1.unique_id());
862    }
863
864    #[tokio::test]
865    async fn test_ganglion_inprocess_capable_with_relevant_neurons() {
866        // Create neurons
867        let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
868            NamespaceImpl {
869                delimiter: ".",
870                parts: vec!["dev", "plexo", "1"],
871            },
872        )));
873        let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
874            NamespaceImpl {
875                delimiter: ".",
876                parts: vec!["dev", "plexo", "2"],
877            },
878        )));
879        // Create ganglion with only neuron1 in relevant_neurons
880        let mut relevant_neurons = HashSet::new();
881        relevant_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
882        let mut ganglion = GanglionInprocess::new_with_filters(relevant_neurons, HashSet::new());
883
884        // Test that neuron1 is capable
885        assert!(ganglion.capable(neuron1.clone()));
886
887        // Test that neuron2 is not capable
888        assert!(!ganglion.capable(neuron2.clone()));
889    }
890
891    #[tokio::test]
892    async fn test_ganglion_inprocess_capable_with_ignored_neurons() {
893        // Create neurons
894        let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
895            NamespaceImpl {
896                delimiter: ".",
897                parts: vec!["dev", "plexo", "1"],
898            },
899        )));
900        let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
901            NamespaceImpl {
902                delimiter: ".",
903                parts: vec!["dev", "plexo", "2"],
904            },
905        )));
906
907        // Create ganglion with neuron1 in ignored_neurons
908        let mut ignored_neurons = HashSet::new();
909        ignored_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
910        let mut ganglion = GanglionInprocess::new_with_filters(HashSet::new(), ignored_neurons);
911
912        // Test that neuron1 is not capable (ignored)
913        assert!(!ganglion.capable(neuron1.clone()));
914
915        // Test that neuron2 is capable (not ignored)
916        assert!(ganglion.capable(neuron2.clone()));
917    }
918
919    #[tokio::test]
920    async fn test_ganglion_external_inprocess_capable_with_relevant_neurons() {
921        // Create neurons
922        let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
923            NamespaceImpl {
924                delimiter: ".",
925                parts: vec!["dev", "plexo", "1"],
926            },
927        )));
928        let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
929            NamespaceImpl {
930                delimiter: ".",
931                parts: vec!["dev", "plexo", "2"],
932            },
933        )));
934
935        // Create ganglion with only neuron1 in relevant_neurons
936        let mut relevant_neurons = HashSet::new();
937        relevant_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
938        let mut ganglion =
939            GanglionExternalInprocess::new_with_filters(relevant_neurons, HashSet::new());
940
941        // Test that neuron1 is capable
942        assert!(ganglion.capable(neuron1.clone()));
943
944        // Test that neuron2 is not capable
945        assert!(!ganglion.capable(neuron2.clone()));
946    }
947
948    #[tokio::test]
949    async fn test_ganglion_external_inprocess_capable_with_ignored_neurons() {
950        // Create neurons
951        let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
952            NamespaceImpl {
953                delimiter: ".",
954                parts: vec!["dev", "plexo", "1"],
955            },
956        )));
957        let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
958            NamespaceImpl {
959                delimiter: ".",
960                parts: vec!["dev", "plexo", "2"],
961            },
962        )));
963
964        // Create ganglion with neuron1 in ignored_neurons
965        let mut ignored_neurons = HashSet::new();
966        ignored_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
967        let mut ganglion =
968            GanglionExternalInprocess::new_with_filters(HashSet::new(), ignored_neurons);
969
970        // Test that neuron1 is not capable (ignored)
971        assert!(!ganglion.capable(neuron1.clone()));
972
973        // Test that neuron2 is capable (not ignored)
974        assert!(ganglion.capable(neuron2.clone()));
975    }
976
977    #[tokio::test]
978    async fn test_ganglion_inprocess_capable_default_behavior() {
979        // Create neuron
980        let neuron = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
981            NamespaceImpl {
982                delimiter: ".",
983                parts: vec!["dev", "plexo"],
984            },
985        )));
986
987        // Create ganglion with default constructor (no filters)
988        let mut ganglion = GanglionInprocess::new();
989
990        // Test that neuron is capable (default behavior should accept all)
991        assert!(ganglion.capable(neuron.clone()));
992    }
993
994    #[tokio::test]
995    async fn test_ganglion_external_inprocess_capable_default_behavior() {
996        // Create neuron
997        let neuron = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(test_namespace()));
998
999        // Create ganglion with default constructor (no filters)
1000        let mut ganglion = GanglionExternalInprocess::new();
1001
1002        // Test that neuron is capable (default behavior should accept all)
1003        assert!(ganglion.capable(neuron.clone()));
1004    }
1005
1006    #[tokio::test]
1007    async fn test_ganglion_external_transmit_encoded() {
1008        let ns = test_namespace();
1009        let mut ganglion = GanglionExternalInprocess::new();
1010        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1011        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1012            Arc::new(neuron_impl.clone());
1013
1014        // Adapt the neuron to the ganglion first
1015        ganglion
1016            .adapt(neuron.clone())
1017            .await
1018            .expect("Failed to adapt neuron");
1019
1020        let debug_struct_value = DebugStruct {
1021            foo: 42,
1022            bar: "test_value".to_owned(),
1023        };
1024        let debug_struct_arc = Arc::new(debug_struct_value);
1025        let correlation_id = Uuid::now_v7();
1026        let encoded = neuron_impl
1027            .encode(debug_struct_arc.as_ref())
1028            .expect("Encoding should succeed in test");
1029
1030        let payload_raw =
1031            PayloadRaw::with_correlation(encoded, Some(neuron.clone()), Some(correlation_id));
1032
1033        let erased_payload = erase_payload_raw(payload_raw);
1034        let result = ganglion
1035            .transmit_encoded(erased_payload)
1036            .await
1037            .expect("Failed to transmit encoded payload");
1038
1039        // For the test implementation, we expect empty vectors since no reactants were added
1040        assert_eq!(result.0.len(), 0);
1041        assert_eq!(result.1.len(), 0);
1042    }
1043
1044    #[tokio::test]
1045    async fn test_ganglion_external_adapt() {
1046        let mut ganglion = GanglionExternalInprocess::new();
1047        let ns = test_namespace();
1048        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1049        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1050            Arc::new(neuron_impl);
1051
1052        // This should complete without error
1053        ganglion
1054            .adapt(neuron)
1055            .await
1056            .expect("Failed to adapt neuron");
1057    }
1058
1059    #[tokio::test]
1060    async fn test_ganglion_external_inprocess_transmit_via_adapt() {
1061        let ns = test_namespace();
1062
1063        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
1064        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
1065        let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
1066        let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
1067
1068        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1069        let neuron_arc = neuron_impl.clone_to_arc();
1070
1071        let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1072            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1073                sender: tx1.clone(),
1074            })),
1075            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1076                sender: tx2.clone(),
1077            })),
1078        ];
1079
1080        let raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1081            erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1082                sender: raw_tx1.clone(),
1083            })),
1084            erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1085                sender: raw_tx2.clone(),
1086            })),
1087        ];
1088
1089        let mut ganglion: GanglionExternalInprocess = GanglionExternalInprocess::new();
1090
1091        ganglion
1092            .adapt(neuron_arc.clone())
1093            .await
1094            .expect("Failed to adapt neuron");
1095        ganglion
1096            .react(neuron_arc.name(), reactants, raw_reactants, vec![])
1097            .await
1098            .expect("Failed to react");
1099
1100        let debug_struct_arc = Arc::new(DebugStruct {
1101            foo: 123,
1102            bar: "ganglion_external_test_payload_1".to_string(),
1103        });
1104        let correlation_uuid1 = Uuid::now_v7();
1105        let encoded = neuron_impl
1106            .encode(debug_struct_arc.as_ref())
1107            .expect("Encoding should succeed in test");
1108        let payload_raw1 = PayloadRaw::with_correlation(
1109            encoded,
1110            Some(neuron_arc.clone()),
1111            Some(correlation_uuid1),
1112        );
1113
1114        let erased_payload1 = erase_payload_raw(payload_raw1);
1115        ganglion
1116            .transmit_encoded(erased_payload1)
1117            .await
1118            .expect("Failed to transmit encoded payload1");
1119
1120        // Check that raw reactants received the payload
1121        let received_raw_p1_ch1 =
1122            tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx1.recv())
1123                .await
1124                .expect("Timeout raw_rx1")
1125                .expect("Closed raw_rx1");
1126        assert_eq!(
1127            received_raw_p1_ch1.correlation_id(), correlation_uuid1,
1128            "Raw correlation ID mismatch for reactant 1"
1129        );
1130
1131        let received_raw_p1_ch2 =
1132            tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx2.recv())
1133                .await
1134                .expect("Timeout raw_rx2")
1135                .expect("Closed raw_rx2");
1136        assert_eq!(
1137            received_raw_p1_ch2.correlation_id(), correlation_uuid1,
1138            "Raw correlation ID mismatch for reactant 2"
1139        );
1140
1141        // Check that regular reactants also received the decoded payload
1142        let received_p1_ch1 =
1143            tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1144                .await
1145                .expect("Timeout rx1")
1146                .expect("Closed rx1");
1147        assert_eq!(
1148            received_p1_ch1.value, debug_struct_arc,
1149            "Payload value mismatch for reactant 1"
1150        );
1151        assert_eq!(
1152            received_p1_ch1.correlation_id(), correlation_uuid1,
1153            "Correlation ID mismatch for reactant 1"
1154        );
1155
1156        let received_p1_ch2 =
1157            tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1158                .await
1159                .expect("Timeout rx2")
1160                .expect("Closed rx2");
1161        assert_eq!(
1162            received_p1_ch2.value, debug_struct_arc,
1163            "Payload value mismatch for reactant 2"
1164        );
1165        assert_eq!(
1166            received_p1_ch2.correlation_id(), correlation_uuid1,
1167            "Correlation ID mismatch for reactant 2"
1168        );
1169
1170        // Send a second payload
1171        let debug_struct_arc_2 = Arc::new(DebugStruct {
1172            foo: 456,
1173            bar: "ganglion_external_test_payload_2".to_string(),
1174        });
1175        let correlation_uuid2 = Uuid::now_v7();
1176        let encoded2 = neuron_impl
1177            .encode(debug_struct_arc_2.as_ref())
1178            .expect("Encoding should succeed in test");
1179        let payload_raw2 = PayloadRaw::with_correlation(
1180            encoded2,
1181            Some(neuron_arc.clone()),
1182            Some(correlation_uuid2),
1183        );
1184
1185        let erased_payload2 = erase_payload_raw(payload_raw2);
1186        ganglion
1187            .transmit_encoded(erased_payload2)
1188            .await
1189            .expect("Failed to transmit encoded payload2");
1190
1191        // Check raw reactants received the second payload
1192        let received_raw_p2_ch1 =
1193            tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx1.recv())
1194                .await
1195                .expect("Timeout raw_rx1_2")
1196                .expect("Closed raw_rx1_2");
1197        assert_eq!(
1198            received_raw_p2_ch1.correlation_id(), correlation_uuid2,
1199            "Second raw correlation ID mismatch for reactant 1"
1200        );
1201
1202        let received_raw_p2_ch2 =
1203            tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx2.recv())
1204                .await
1205                .expect("Timeout raw_rx2_2")
1206                .expect("Closed raw_rx2_2");
1207        assert_eq!(
1208            received_raw_p2_ch2.correlation_id(), correlation_uuid2,
1209            "Second raw correlation ID mismatch for reactant 2"
1210        );
1211
1212        // Check regular reactants received the second decoded payload
1213        let received_p2_ch1 =
1214            tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1215                .await
1216                .expect("Timeout rx1_2")
1217                .expect("Closed rx1_2");
1218        assert_eq!(
1219            received_p2_ch1.value, debug_struct_arc_2,
1220            "Second payload value mismatch for reactant 1"
1221        );
1222        assert_eq!(
1223            received_p2_ch1.correlation_id(), correlation_uuid2,
1224            "Second correlation ID mismatch for reactant 1"
1225        );
1226
1227        let received_p2_ch2 =
1228            tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1229                .await
1230                .expect("Timeout rx2_2")
1231                .expect("Closed rx2_2");
1232        assert_eq!(
1233            received_p2_ch2.value, debug_struct_arc_2,
1234            "Second payload value mismatch for reactant 2"
1235        );
1236        assert_eq!(
1237            received_p2_ch2.correlation_id(), correlation_uuid2,
1238            "Second correlation ID mismatch for reactant 2"
1239        );
1240
1241        // Ensure channels are empty now, indicating no unexpected messages
1242        assert_eq!(
1243            rx1.len(),
1244            0,
1245            "Reactant 1 channel should be empty after all expected messages"
1246        );
1247        assert_eq!(
1248            rx2.len(),
1249            0,
1250            "Reactant 2 channel should be empty after all expected messages"
1251        );
1252        assert_eq!(
1253            raw_rx1.len(),
1254            0,
1255            "Raw reactant 1 channel should be empty after all expected messages"
1256        );
1257        assert_eq!(
1258            raw_rx2.len(),
1259            0,
1260            "Raw reactant 2 channel should be empty after all expected messages"
1261        );
1262    }
1263
1264    #[tokio::test]
1265    async fn test_ganglion_inprocess_adapt_erased() {
1266        let ns = test_namespace();
1267
1268        // Create a neuron
1269        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1270        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1271            Arc::new(neuron_impl);
1272
1273        // Create a ganglion
1274        let mut ganglion = GanglionInprocess::new();
1275
1276        // Use the regular adapt method instead of adapt_erased
1277        ganglion
1278            .adapt(neuron.clone())
1279            .await
1280            .expect("Failed to adapt neuron");
1281
1282        // Verify that a synapse was created and stored
1283        let neuron_name = neuron.name();
1284        let synapse = ganglion.get_synapse_by_name(&neuron_name);
1285        assert!(synapse.is_some());
1286    }
1287
1288    #[tokio::test]
1289    async fn test_ganglion_external_inprocess_across_threads() {
1290        use crate::ganglion::GanglionExternal;
1291        use crate::neuron::NeuronImpl;
1292        use crate::payload::PayloadRaw;
1293        use crate::test_utils::{
1294            DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant,
1295            TokioMpscReactantRaw, test_namespace,
1296        };
1297        use std::sync::Arc;
1298        use tokio::sync::mpsc::channel;
1299        use tokio::task;
1300        use tokio::time::{Duration, sleep};
1301        use uuid::Uuid;
1302
1303        // Create a struct to hold the shared state
1304        struct SharedState {
1305            // Channels to receive payloads from reactants
1306            tx1: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
1307            tx2: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
1308            raw_tx1: tokio::sync::mpsc::Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
1309            raw_tx2: tokio::sync::mpsc::Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
1310            // Counter for received payloads
1311            received_count: std::sync::atomic::AtomicUsize,
1312            raw_received_count: std::sync::atomic::AtomicUsize,
1313        }
1314
1315        // Create channels with large buffer to avoid blocking
1316        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
1317        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
1318        let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(100);
1319        let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(100);
1320
1321        // Create shared state
1322        let shared_state = Arc::new(SharedState {
1323            tx1,
1324            tx2,
1325            raw_tx1,
1326            raw_tx2,
1327            received_count: std::sync::atomic::AtomicUsize::new(0),
1328            raw_received_count: std::sync::atomic::AtomicUsize::new(0),
1329        });
1330
1331        // Number of threads and payloads per thread
1332        let num_threads = 10;
1333        let payloads_per_thread = 10;
1334        let total_payloads = num_threads * payloads_per_thread;
1335
1336        // Create a vector to store all task handles
1337        let mut handles = Vec::new();
1338
1339        // Spawn a task to receive payloads and count them
1340        let receiver_state = shared_state.clone();
1341        let receiver_handle = task::spawn(async move {
1342            let mut received_payloads = Vec::new();
1343            let mut received_raw_payloads = Vec::new();
1344
1345            // Collect payloads from all channels
1346            for _ in 0..total_payloads * 4 {
1347                // 2 regular + 2 raw channels
1348                tokio::select! {
1349                    Some(payload) = rx1.recv() => {
1350                        received_payloads.push(payload);
1351                        receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1352                    }
1353                    Some(payload) = rx2.recv() => {
1354                        received_payloads.push(payload);
1355                        receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1356                    }
1357                    Some(payload) = raw_rx1.recv() => {
1358                        received_raw_payloads.push(payload);
1359                        receiver_state.raw_received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1360                    }
1361                    Some(payload) = raw_rx2.recv() => {
1362                        received_raw_payloads.push(payload);
1363                        receiver_state.raw_received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1364                    }
1365                }
1366            }
1367
1368            (received_payloads, received_raw_payloads)
1369        });
1370
1371        // Create shared namespace, neuron, and ganglion outside the tasks
1372        let ns = test_namespace();
1373        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1374        let neuron = neuron_impl.clone_to_arc();
1375
1376        // Create a single shared ganglion with reactants
1377        let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1378            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1379                sender: shared_state.tx1.clone(),
1380            })),
1381            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1382                sender: shared_state.tx2.clone(),
1383            })),
1384        ];
1385
1386        let raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1387            erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1388                sender: shared_state.raw_tx1.clone(),
1389            })),
1390            erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1391                sender: shared_state.raw_tx2.clone(),
1392            })),
1393        ];
1394
1395        let mut shared_ganglion = GanglionExternalInprocess::new();
1396        shared_ganglion
1397            .adapt(neuron.clone())
1398            .await
1399            .expect("Failed to adapt neuron");
1400        shared_ganglion
1401            .react(neuron.name(), reactants, raw_reactants, vec![])
1402            .await
1403            .expect("Failed to react");
1404
1405        let shared_ganglion = Arc::new(tokio::sync::Mutex::new(shared_ganglion));
1406
1407        // Spawn multiple tasks that will transmit payloads using the shared ganglion
1408        for thread_id in 0..num_threads {
1409            let ganglion = shared_ganglion.clone();
1410            let neuron_clone = neuron.clone();
1411            let neuron_impl_clone = neuron_impl.clone();
1412
1413            // Spawn a new task
1414            let handle = task::spawn(async move {
1415                // Transmit payloads through this task's ganglion
1416                for i in 0..payloads_per_thread {
1417                    // Create a unique payload for this thread and iteration
1418                    let payload_id = thread_id * payloads_per_thread + i;
1419                    let debug_struct = Arc::new(DebugStruct {
1420                        foo: payload_id as i32,
1421                        bar: format!("external_thread_{thread_id}_payload_{i}"),
1422                    });
1423
1424                    let correlation_uuid = Uuid::now_v7();
1425                    let encoded = neuron_impl_clone
1426                        .encode(debug_struct.as_ref())
1427                        .expect("Encoding should succeed in test");
1428                    let payload_raw = PayloadRaw::with_correlation(
1429                        encoded,
1430                        Some(neuron_clone.clone()),
1431                        Some(correlation_uuid),
1432                    );
1433
1434                    // Add a small delay to increase the chance of thread interleaving
1435                    sleep(Duration::from_millis(1)).await;
1436
1437                    // Transmit the payload through the shared ganglion
1438                    let mut ganglion_guard = ganglion.lock().await;
1439                    let erased_payload = erase_payload_raw(payload_raw);
1440                    ganglion_guard
1441                        .transmit_encoded(erased_payload)
1442                        .await
1443                        .expect("Failed to transmit encoded payload");
1444                }
1445            });
1446
1447            handles.push(handle);
1448        }
1449
1450        // Wait for all transmitter tasks to complete
1451        for handle in handles {
1452            handle.await.unwrap();
1453        }
1454
1455        // Wait for the receiver task to complete and get the received payloads
1456        let (received_payloads, received_raw_payloads) = receiver_handle.await.unwrap();
1457
1458        // Verify that we received the expected number of payloads
1459        assert_eq!(
1460            shared_state
1461                .received_count
1462                .load(std::sync::atomic::Ordering::SeqCst),
1463            total_payloads * 2,
1464            "Should have received all decoded payloads on both regular reactants"
1465        );
1466
1467        assert_eq!(
1468            shared_state
1469                .raw_received_count
1470                .load(std::sync::atomic::Ordering::SeqCst),
1471            total_payloads * 2,
1472            "Should have received all raw payloads on both raw reactants"
1473        );
1474
1475        // Verify that we received payloads from all threads (regular reactants)
1476        let mut foo_values = received_payloads
1477            .iter()
1478            .map(|p| p.value.foo)
1479            .collect::<Vec<_>>();
1480        foo_values.sort();
1481        foo_values.dedup();
1482
1483        assert_eq!(
1484            foo_values.len(),
1485            total_payloads,
1486            "Should have received payloads with all expected foo values"
1487        );
1488
1489        // Check that the foo values match the expected range
1490        for i in 0..total_payloads {
1491            assert!(
1492                foo_values.contains(&(i as i32)),
1493                "Should have received a payload with foo={i}"
1494            );
1495        }
1496
1497        // Verify that all correlation_ids are preserved (regular reactants)
1498        let mut correlation_ids = received_payloads
1499            .iter()
1500            .map(|p| p.correlation_id())
1501            .collect::<Vec<_>>();
1502
1503        // Each correlation_id should appear exactly twice (once from each reactant)
1504        // So we should have total_payloads unique correlation_ids
1505        correlation_ids.sort();
1506
1507        // Count occurrences of each correlation_id
1508        let mut correlation_id_counts = std::collections::HashMap::new();
1509        for id in &correlation_ids {
1510            *correlation_id_counts.entry(*id).or_insert(0) += 1;
1511        }
1512
1513        // Verify we have the expected number of unique correlation_ids (regular reactants)
1514        assert_eq!(
1515            correlation_id_counts.len(),
1516            total_payloads,
1517            "Should have received payloads with all expected correlation_ids"
1518        );
1519
1520        // Verify each correlation_id appears exactly twice (once from each reactant)
1521        for (id, count) in correlation_id_counts {
1522            assert_eq!(
1523                count, 2,
1524                "Correlation ID {id} should appear exactly twice (once from each regular reactant)"
1525            );
1526        }
1527
1528        // Verify that all correlation_ids are preserved (raw reactants)
1529        let mut raw_correlation_ids = received_raw_payloads
1530            .iter()
1531            .map(|p| p.correlation_id())
1532            .collect::<Vec<_>>();
1533
1534        raw_correlation_ids.sort();
1535
1536        // Count occurrences of each correlation_id for raw reactants
1537        let mut raw_correlation_id_counts = std::collections::HashMap::new();
1538        for id in &raw_correlation_ids {
1539            *raw_correlation_id_counts.entry(*id).or_insert(0) += 1;
1540        }
1541
1542        // Verify we have the expected number of unique correlation_ids for raw reactants
1543        assert_eq!(
1544            raw_correlation_id_counts.len(),
1545            total_payloads,
1546            "Should have received raw payloads with all expected correlation_ids"
1547        );
1548
1549        // Verify each correlation_id appears exactly twice for raw reactants (once from each raw reactant)
1550        for (id, count) in raw_correlation_id_counts {
1551            assert_eq!(
1552                count, 2,
1553                "Correlation ID {id} should appear exactly twice (once from each raw reactant)"
1554            );
1555        }
1556    }
1557}