Skip to main content

plexor_core/
ganglion.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::codec::{Codec, CodecName};
8use crate::erasure::payload::{PayloadErased, PayloadRawErased};
9use crate::erasure::reactant::{ErrorReactantErased, ReactantErased, ReactantRawErased};
10use crate::erasure::synapse::{SynapseInternalErased, erase_synapse_internal};
11use crate::logging::LogTrace;
12use crate::neuron::{Neuron, NeuronError};
13use crate::synapse::SynapseInprocess;
14use crate::utils::struct_name_of_type;
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use parking_lot::RwLock;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::Instrument;
24use uuid::Uuid;
25
26#[derive(Error, Debug)]
27pub enum GanglionError {
28    #[error("Synapse not found: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
29    SynapseNotFound {
30        neuron_name: String,
31        ganglion_name: String,
32        ganglion_id: Uuid,
33    },
34    #[error(
35        "Failed to acquire lock on synapse: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
36    )]
37    SynapseLock {
38        neuron_name: String,
39        ganglion_name: String,
40        ganglion_id: Uuid,
41    },
42    #[error(
43        "Transmission error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id}): {message}"
44    )]
45    Transmit {
46        neuron_name: String,
47        ganglion_name: String,
48        ganglion_id: Uuid,
49        message: String,
50    },
51    #[error("Encode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
52    Encode {
53        neuron_name: String,
54        ganglion_name: String,
55        ganglion_id: Uuid,
56    },
57    #[error("Decode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
58    Decode {
59        neuron_name: String,
60        ganglion_name: String,
61        ganglion_id: Uuid,
62    },
63    #[error(
64        "Adaptation error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
65    )]
66    Adapt {
67        neuron_name: String,
68        ganglion_name: String,
69        ganglion_id: Uuid,
70    },
71    #[error("Queue full for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
72    QueueFull {
73        neuron_name: String,
74        ganglion_name: String,
75        ganglion_id: Uuid,
76    },
77}
78
79impl GanglionError {
80    /// Convert a NeuronError to a GanglionError with additional context
81    pub fn from_neuron_error(
82        neuron_error: NeuronError,
83        ganglion_name: String,
84        ganglion_id: Uuid,
85    ) -> Self {
86        match neuron_error {
87            NeuronError::Encode { neuron_name, .. } => GanglionError::Encode {
88                neuron_name,
89                ganglion_name,
90                ganglion_id,
91            },
92            NeuronError::Decode { neuron_name, .. } => GanglionError::Decode {
93                neuron_name,
94                ganglion_name,
95                ganglion_id,
96            },
97        }
98    }
99
100    /// Convert a PlexusError to a GanglionError with additional context
101    pub fn from_plexus_error(
102        plexus_error: crate::plexus::PlexusError,
103        neuron_name: String,
104        ganglion_name: String,
105        ganglion_id: Uuid,
106    ) -> Self {
107        match plexus_error {
108            // If the PlexusError already contains a GanglionError, return it as-is
109            crate::plexus::PlexusError::Ganglion(ganglion_error) => ganglion_error,
110            // For all other PlexusError variants, convert to Adapt error
111            _ => GanglionError::Adapt {
112                neuron_name,
113                ganglion_name,
114                ganglion_id,
115            },
116        }
117    }
118}
119
120pub trait Ganglion {
121    fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
122    where
123        C: Codec<T> + CodecName + Send + Sync + 'static,
124        T: Send + Sync + 'static;
125
126    fn adapt<T, C>(
127        &mut self,
128        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
129    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
130    where
131        C: Codec<T> + CodecName + Send + Sync + 'static,
132        T: Send + Sync + 'static;
133}
134
135/// Internal interface for Ganglion that handles type-erased payloads and reactants.
136pub trait GanglionInternal {
137    fn transmit(
138        &mut self,
139        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
140    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>>;
141
142    fn react(
143        &mut self,
144        neuron_name: String,
145        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
146        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
147    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>;
148
149    /// React to multiple neurons at once.
150    fn react_many(
151        &mut self,
152        reactions: HashMap<
153            String,
154            (
155                Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
156                Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
157            ),
158        >,
159    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
160        let mut futures = Vec::new();
161        for (name, (rs, ers)) in reactions {
162            futures.push(self.react(name, rs, ers));
163        }
164        Box::pin(async move {
165            for f in futures {
166                f.await?;
167            }
168            Ok(())
169        })
170    }
171
172    fn unique_id(&self) -> Uuid;
173}
174
175/// External interface for Ganglion that handles network-level type-erased data.
176pub trait GanglionExternal {
177    fn transmit(
178        &mut self,
179        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
180    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>>;
181
182    #[allow(clippy::type_complexity)]
183    fn transmit_encoded(
184        &mut self,
185        payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
186    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), GanglionError>> + Send + 'static>>;
187
188    fn react(
189        &mut self,
190        neuron_name: String,
191        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
192        raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
193        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
194    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>;
195
196    /// React to multiple neurons at once.
197    fn react_many(
198        &mut self,
199        reactions: HashMap<
200            String,
201            (
202                Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
203                Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
204                Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
205            ),
206        >,
207    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
208        let mut futures = Vec::new();
209        for (name, (rs, rrs, ers)) in reactions {
210            futures.push(self.react(name, rs, rrs, ers));
211        }
212        Box::pin(async move {
213            for f in futures {
214                f.await?;
215            }
216            Ok(())
217        })
218    }
219
220    fn unique_id(&self) -> Uuid;
221}
222
223/// Helper function to adapt a single neuron to multiple ganglia at once.
224/// Note: All ganglia in the slice must be of the same type G.
225/// For heterogeneous ganglia, use the `adapt_all!` macro instead.
226pub async fn adapt_all<T, C, G>(
227    ganglia: &[Arc<Mutex<G>>],
228    neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
229) -> Result<(), GanglionError>
230where
231    C: Codec<T> + CodecName + Send + Sync + 'static,
232    T: Send + Sync + 'static,
233    G: Ganglion + ?Sized,
234{
235    for ganglion_mutex in ganglia {
236        let mut ganglion = ganglion_mutex.lock().await;
237        ganglion.adapt(neuron.clone()).await?;
238    }
239    Ok(())
240}
241
242/// Macro to adapt a single neuron to multiple heterogeneous ganglia at once.
243///
244/// # Examples
245///
246/// ```rust
247/// # use std::sync::Arc;
248/// # use tokio::sync::Mutex;
249/// # use plexor_core::adapt_all;
250/// # use plexor_core::neuron::{Neuron, NeuronImpl};
251/// # use plexor_core::ganglion::{Ganglion, GanglionInprocess};
252/// # use plexor_core::namespace::NamespaceImpl;
253/// # use plexor_core::codec::{Codec, CodecError, CodecName};
254/// #
255/// # #[derive(Debug)]
256/// # struct Dummy;
257/// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
258/// # impl Codec<Dummy> for Dummy {
259/// #     fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
260/// #     fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy) }
261/// # }
262/// #
263/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
264/// let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
265/// let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
266/// let g1 = Arc::new(Mutex::new(GanglionInprocess::new()));
267/// let g2 = Arc::new(Mutex::new(GanglionInprocess::new()));
268///
269/// adapt_all!(neuron, g1, g2).await.unwrap();
270/// # });
271/// ```
272#[macro_export]
273macro_rules! adapt_all {
274    ($neuron:expr, $($ganglion:expr),+ $(,)?) => {
275        async {
276            $(
277                $ganglion.lock().await.adapt($neuron.clone()).await?;
278            )+
279            Result::<(), $crate::ganglion::GanglionError>::Ok(())
280        }
281    };
282}
283
284/// Macro to adapt multiple neurons to multiple heterogeneous ganglia at once.
285///
286/// This locks each ganglion once and adapts all provided neurons to it.
287///
288/// # Examples
289///
290/// ```rust
291/// # use std::sync::Arc;
292/// # use tokio::sync::Mutex;
293/// # use plexor_core::adapt_many;
294/// # use plexor_core::neuron::{Neuron, NeuronImpl};
295/// # use plexor_core::ganglion::{Ganglion, GanglionInprocess};
296/// # use plexor_core::namespace::NamespaceImpl;
297/// # use plexor_core::codec::{Codec, CodecError, CodecName};
298/// #
299/// # #[derive(Debug, Clone)]
300/// # struct Dummy;
301/// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
302/// # impl Codec<Dummy> for Dummy {
303/// #     fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
304/// #     fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy) }
305/// # }
306/// #
307/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
308/// let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
309/// let n1 = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns.clone()));
310/// let n2 = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
311/// let g1 = Arc::new(Mutex::new(GanglionInprocess::new()));
312/// let g2 = Arc::new(Mutex::new(GanglionInprocess::new()));
313///
314/// let res: Result<(), plexor_core::ganglion::GanglionError> = adapt_many!([n1, n2], g1, g2).await;
315/// res.unwrap();
316/// # });
317/// ```
318#[macro_export]
319macro_rules! adapt_many {
320    // Base case: simple expansion for a single ganglion
321    ([$($neuron:expr),+ $(,)?], $ganglion:expr $(,)?) => {
322        async {
323            let mut g = $ganglion.lock().await;
324            $(
325                g.adapt($neuron.clone()).await?;
326            )+
327            Result::<(), $crate::ganglion::GanglionError>::Ok(())
328        }
329    };
330
331    // Recursive step: handle head ganglion, then recurse for tail
332    ([$($neuron:expr),+ $(,)?], $head_ganglion:expr, $($tail_ganglion:expr),+ $(,)?) => {
333        async {
334            {
335                let mut g = $head_ganglion.lock().await;
336                $(
337                    g.adapt($neuron.clone()).await?;
338                )+
339            }
340            $crate::adapt_many!([$($neuron),+], $($tail_ganglion),+).await
341        }
342    };
343}
344
345pub struct GanglionInprocess {
346    id: Uuid,
347    synapses_by_name:
348        HashMap<String, Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>>,
349    /// Neurons that this ganglion will handle (if empty, handles all)
350    relevant_neurons: HashSet<String>,
351    /// Neurons that this ganglion will ignore
352    ignored_neurons: HashSet<String>,
353}
354
355impl GanglionInprocess {
356    pub fn new() -> Self {
357        Self {
358            id: Uuid::now_v7(),
359            synapses_by_name: HashMap::new(),
360            relevant_neurons: HashSet::new(),
361            ignored_neurons: HashSet::new(),
362        }
363    }
364
365    /// Helper to create an Arc<Mutex<GanglionInprocess>>
366    pub fn new_shared() -> Arc<Mutex<Self>> {
367        Arc::new(Mutex::new(Self::new()))
368    }
369
370    pub fn new_with_filters(
371        relevant_neurons: HashSet<String>,
372        ignored_neurons: HashSet<String>,
373    ) -> Self {
374        Self {
375            id: Uuid::now_v7(),
376            synapses_by_name: HashMap::new(),
377            relevant_neurons,
378            ignored_neurons,
379        }
380    }
381
382    fn get_synapse_by_name(
383        &self,
384        name: &str,
385    ) -> Option<Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>> {
386        self.synapses_by_name.get(name).cloned()
387    }
388}
389
390impl Default for GanglionInprocess {
391    fn default() -> Self {
392        Self::new()
393    }
394}
395
396impl Ganglion for GanglionInprocess {
397    fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
398    where
399        C: Codec<T> + CodecName + Send + Sync + 'static,
400        T: Send + Sync + 'static,
401    {
402        let neuron_name = neuron.name();
403
404        if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
405            return false;
406        }
407
408        if !self.ignored_neurons.is_empty() && self.ignored_neurons.contains(&neuron_name) {
409            return false;
410        }
411
412        true
413    }
414
415    fn adapt<T, C>(
416        &mut self,
417        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
418    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
419    where
420        C: Codec<T> + CodecName + Send + Sync + 'static,
421        T: Send + Sync + 'static,
422    {
423        // Call capable and return early if false
424        if !self.capable(neuron.clone()) {
425            return Box::pin(async move {
426                Ok(()) // Not an error, just not capable
427            });
428        }
429
430        let neuron_name = neuron.name();
431
432        // Check if the synapse already exists
433        if self.synapses_by_name.contains_key(&neuron_name) {
434            return Box::pin(async move {
435                Ok(()) // Not an error, synapse already exists
436            });
437        }
438
439        // Create the synapse
440        let synapse = SynapseInprocess::<T, C>::new(neuron.clone(), vec![], vec![]);
441        let erased_synapse = erase_synapse_internal(synapse);
442
443        // Insert the synapse
444        self.synapses_by_name
445            .insert(neuron_name.clone(), erased_synapse);
446
447        Box::pin(async move { Ok(()) })
448    }
449}
450
451impl GanglionInternal for GanglionInprocess {
452    fn transmit(
453        &mut self,
454        payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
455    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
456        let neuron_name = payload.get_neuron_name();
457        tracing::debug!("GanglionInprocess::transmit called for neuron: {neuron_name}");
458
459        if let Some(synapse_lock) = self.get_synapse_by_name(&neuron_name) {
460            tracing::debug!("GanglionInprocess::transmit found synapse, acquiring read lock");
461            let synapse_guard = synapse_lock.read();
462            tracing::debug!(
463                "GanglionInprocess::transmit acquired read lock, calling transmit_erased"
464            );
465            let future = synapse_guard.transmit_erased(payload.clone());
466            let ganglion_id = self.id;
467            let ganglion_name = struct_name_of_type::<Self>().to_string();
468            Box::pin(
469                async move {
470                    tracing::debug!(
471                        "GanglionInprocess::transmit awaiting transmit_erased future"
472                    );
473                    let result = future.await;
474                    tracing::debug!("GanglionInprocess::transmit transmit_erased completed");
475                    result.map_err(|e| match e {
476                        crate::synapse::SynapseError::QueueFull { neuron_name: _ } => {
477                            GanglionError::QueueFull {
478                                neuron_name: neuron_name.clone(),
479                                ganglion_name: ganglion_name.clone(),
480                                ganglion_id,
481                            }
482                        }
483                        _ => GanglionError::Transmit {
484                            neuron_name: neuron_name.clone(),
485                            ganglion_name: ganglion_name.clone(),
486                            ganglion_id,
487                            message: e.to_string(),
488                        },
489                    })
490                }
491                .instrument(payload.span_debug("GanglionInprocess::transmit")),
492            )
493        } else {
494            tracing::debug!("GanglionInprocess::transmit synapse not found");
495            let ganglion_id = self.id;
496            let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
497            Box::pin(async move {
498                Err(GanglionError::SynapseNotFound {
499                    neuron_name,
500                    ganglion_name,
501                    ganglion_id,
502                })
503            })
504        }
505    }
506
507    fn react(
508        &mut self,
509        neuron_name: String,
510        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
511        error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
512    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
513        // Get the synapse by name
514        let synapse_lock_opt = self.get_synapse_by_name(&neuron_name);
515
516        // Check if the synapse exists
517        if synapse_lock_opt.is_none() {
518            let ganglion_id = self.id;
519            let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
520            return Box::pin(async move {
521                Err(GanglionError::SynapseNotFound {
522                    neuron_name,
523                    ganglion_name,
524                    ganglion_id,
525                })
526            });
527        }
528
529        // Get a write lock on the synapse
530        let synapse_lock = synapse_lock_opt.unwrap();
531        let mut synapse_guard = synapse_lock.write();
532
533        // Call react_erased on the synapse with the reactants
534        synapse_guard.react_erased(reactants, error_reactants);
535
536        // The write lock is automatically released when synapse_guard goes out of scope
537
538        Box::pin(async move { Ok(()) })
539    }
540
541    fn react_many(
542        &mut self,
543        reactions: HashMap<
544            String,
545            (
546                Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
547                Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
548            ),
549        >,
550    ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
551        for (neuron_name, (reactants, error_reactants)) in reactions {
552            if let Some(synapse_lock) = self.get_synapse_by_name(&neuron_name) {
553                let mut synapse_guard = synapse_lock.write();
554                synapse_guard.react_erased(reactants, error_reactants);
555            } else {
556                let ganglion_id = self.id;
557                let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
558                return Box::pin(async move {
559                    Err(GanglionError::SynapseNotFound {
560                        neuron_name,
561                        ganglion_name,
562                        ganglion_id,
563                    })
564                });
565            }
566        }
567        Box::pin(async move { Ok(()) })
568    }
569
570    fn unique_id(&self) -> Uuid {
571        self.id
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578    use crate::erasure::payload::{erase_payload, erase_payload_raw};
579    use crate::erasure::reactant::{
580        ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
581    };
582    use crate::namespace::NamespaceImpl;
583    use crate::neuron::{Neuron, NeuronImpl};
584    use crate::payload::{Payload, PayloadRaw};
585    use crate::test_utils::{
586        DebugCodec, DebugStruct, GanglionExternalInprocess, PingCodec, PingMsg, PingNeuron,
587        TokioMpscReactant, TokioMpscReactantGeneric, TokioMpscReactantRaw, test_namespace,
588    };
589    use std::collections::HashSet;
590    use std::sync::Arc;
591    use std::time::Duration;
592    use tokio::sync::mpsc::channel;
593    use tokio::task;
594    use tokio::time::sleep;
595    use uuid::Uuid;
596
597    #[test]
598    fn test_ganglion_error_with_ganglion_name() {
599        let ganglion_id = Uuid::now_v7();
600        let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
601
602        // Test SynapseNotFound error
603        let synapse_not_found = GanglionError::SynapseNotFound {
604            neuron_name: "test_neuron".to_string(),
605            ganglion_name: ganglion_name.clone(),
606            ganglion_id,
607        };
608        assert!(synapse_not_found.to_string().contains("test_neuron"));
609        assert!(synapse_not_found.to_string().contains("GanglionInprocess"));
610
611        // Test SynapseLockError error
612        let synapse_lock_error = GanglionError::SynapseLock {
613            neuron_name: "test_neuron".to_string(),
614            ganglion_name: ganglion_name.clone(),
615            ganglion_id,
616        };
617        assert!(synapse_lock_error.to_string().contains("test_neuron"));
618        assert!(synapse_lock_error.to_string().contains("GanglionInprocess"));
619
620        // Test new Transmit error
621        let transmit_error = GanglionError::Transmit {
622            neuron_name: "test_neuron".to_string(),
623            ganglion_name: ganglion_name.clone(),
624            ganglion_id,
625            message: "test failure".to_string(),
626        };
627        assert!(transmit_error.to_string().contains("test_neuron"));
628        assert!(transmit_error.to_string().contains("GanglionInprocess"));
629        assert!(transmit_error.to_string().contains("test failure"));
630    }
631    // test_ganglion_inprocess_neuron_by_name has been removed as part of removing get_neuron_by_name and neurons_by_name
632
633    #[tokio::test]
634    async fn test_ganglion_inprocess_get_synapse_by_name() {
635        let ns = test_namespace();
636        let neuron_impl_instance: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
637        let neuron_name_str = neuron_impl_instance.name();
638
639        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
640            Arc::new(neuron_impl_instance);
641
642        let mut ganglion = GanglionInprocess::new();
643
644        // Use adapt instead of add_neuron and populate_synapse
645        ganglion
646            .adapt(neuron.clone())
647            .await
648            .expect("Failed to adapt neuron for test");
649
650        let result_by_name1 = ganglion.get_synapse_by_name(&neuron_name_str);
651        assert!(result_by_name1.is_some());
652        let synapse_by_name1 = result_by_name1.unwrap();
653
654        let result_by_name2 = ganglion.get_synapse_by_name(&neuron_name_str);
655        assert!(result_by_name2.is_some());
656        let synapse_by_name2 = result_by_name2.unwrap();
657
658        assert!(
659            Arc::ptr_eq(&synapse_by_name1, &synapse_by_name2),
660            "Repeated calls to get_synapse_by_name should yield the same Arc instance."
661        );
662
663        let non_existent_result = ganglion.get_synapse_by_name("non_existent_neuron");
664        assert!(non_existent_result.is_none());
665    }
666
667    #[tokio::test]
668    async fn test_ganglion_inprocess_transmit_via_adapt() {
669        let ns = test_namespace();
670
671        let (tx1, mut rx1) = channel::<Arc<Payload<PingMsg, PingCodec>>>(10);
672        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
673
674        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
675
676        let neuron_arc = neuron_impl.clone_to_arc();
677
678        let mut ganglion: GanglionInprocess = GanglionInprocess::new();
679
680        // Use adapt instead of add_neuron and populate_synapse
681        ganglion
682            .adapt(neuron_arc.clone())
683            .await
684            .expect("Failed to adapt neuron");
685
686        let ping_neuron = Arc::new(PingNeuron::new(test_namespace()));
687        ganglion
688            .adapt(ping_neuron.clone())
689            .await
690            .expect("Failed to adapt ping neuron");
691
692        let reactants1: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> =
693            vec![erase_reactant::<PingMsg, PingCodec, _>(Box::new(
694                TokioMpscReactantGeneric::new(tx1.clone()),
695            ))];
696        let reactants2: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> =
697            vec![erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(
698                TokioMpscReactantGeneric::new(tx2.clone()),
699            ))];
700
701        ganglion
702            .react(ping_neuron.name(), reactants1, vec![])
703            .await
704            .expect("Failed to react ping");
705        ganglion
706            .react(neuron_arc.name(), reactants2, vec![])
707            .await
708            .expect("Failed to react debug");
709
710        let correlation_uuid1 = Uuid::now_v7();
711        let payload1 = Payload::builder()
712            .value(PingMsg { seq: 1 })
713            .correlation_id(correlation_uuid1)
714            .neuron(ping_neuron)
715            .build()
716            .unwrap();
717
718        let erased_payload1 = erase_payload(payload1);
719        ganglion
720            .transmit(erased_payload1)
721            .await
722            .expect("Failed to transmit payload1");
723
724        assert_eq!(
725            rx1.len(),
726            1,
727            "Reactant 1 should have received the first message (ping)"
728        );
729        let received_p1_ch1 = rx1.recv().await.unwrap();
730        assert_eq!(
731            received_p1_ch1.value.seq, 1,
732            "Payload value mismatch for reactant 1"
733        );
734        assert_eq!(
735            received_p1_ch1.correlation_id(), correlation_uuid1,
736            "Correlation ID mismatch for reactant 1"
737        );
738
739        let debug_struct_arc_2 = Arc::new(DebugStruct {
740            foo: 456,
741            bar: "ganglion_test_payload_2".to_string(),
742        });
743        let correlation_uuid2 = Uuid::now_v7();
744        let payload2 = Payload::builder()
745            .value((*debug_struct_arc_2).clone())
746            .correlation_id(correlation_uuid2)
747            .neuron(neuron_arc.clone())
748            .build()
749            .unwrap();
750
751        let erased_payload2 = erase_payload(payload2);
752        ganglion
753            .transmit(erased_payload2)
754            .await
755            .expect("Failed to transmit payload2");
756
757        assert_eq!(
758            rx2.len(),
759            1,
760            "Reactant 2 should have received the second message (debug)"
761        );
762        let received_p2_ch1 = rx2.recv().await.unwrap();
763        assert_eq!(
764            received_p2_ch1.value, debug_struct_arc_2,
765            "Second payload value mismatch for reactant 2"
766        );
767        assert_eq!(
768            received_p2_ch1.correlation_id(), correlation_uuid2,
769            "Second correlation ID mismatch for reactant 2"
770        );
771
772        // Ensure channels are empty now, indicating no unexpected messages
773        assert_eq!(
774            rx1.len(),
775            0,
776            "Reactant 1 channel should be empty after all expected messages"
777        );
778        assert_eq!(
779            rx2.len(),
780            0,
781            "Reactant 2 channel should be empty after all expected messages"
782        );
783    }
784
785    #[tokio::test]
786    async fn test_ganglion_inprocess_across_threads() {
787        // Create a struct to hold the shared state
788        struct SharedState {
789            // Channels to receive payloads from reactants
790            tx1: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
791            tx2: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
792            // Counter for received payloads
793            received_count: std::sync::atomic::AtomicUsize,
794        }
795
796        // Create channels with large buffer to avoid blocking
797        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
798        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
799
800        // Create shared state
801        let shared_state = Arc::new(SharedState {
802            tx1,
803            tx2,
804            received_count: std::sync::atomic::AtomicUsize::new(0),
805        });
806
807        // Number of threads and payloads per thread
808        let num_threads = 10;
809        let payloads_per_thread = 10;
810        let total_payloads = num_threads * payloads_per_thread;
811
812        // Create a vector to store all task handles
813        let mut handles = Vec::new();
814
815        // Spawn a task to receive payloads and count them
816        let receiver_state = shared_state.clone();
817        let receiver_handle = task::spawn(async move {
818            let mut received_payloads = Vec::new();
819
820            // Collect payloads from both channels
821            for _ in 0..total_payloads * 2 {
822                tokio::select! {
823                    Some(payload) = rx1.recv() => {
824                        received_payloads.push(payload);
825                        receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
826                    }
827                    Some(payload) = rx2.recv() => {
828                        received_payloads.push(payload);
829                        receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
830                    }
831                }
832            }
833
834            received_payloads
835        });
836
837        // Create shared namespace, neuron, and ganglion outside the tasks
838        let ns = test_namespace();
839        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
840        let neuron = neuron_impl.clone_to_arc();
841
842        let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
843            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
844                sender: shared_state.tx1.clone(),
845            })),
846            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
847                sender: shared_state.tx2.clone(),
848            })),
849        ];
850
851        let mut shared_ganglion = GanglionInprocess::new();
852        // Use adapt instead of add_neuron and populate_synapse
853        shared_ganglion
854            .adapt(neuron.clone())
855            .await
856            .expect("Failed to adapt neuron");
857        shared_ganglion
858            .react(neuron.name(), reactants, vec![])
859            .await
860            .expect("Failed to react");
861
862        let shared_ganglion = Arc::new(tokio::sync::Mutex::new(shared_ganglion));
863
864        // Spawn multiple tasks that will transmit payloads using the shared ganglion
865        for thread_id in 0..num_threads {
866            let ganglion = shared_ganglion.clone();
867            let neuron_clone = neuron.clone();
868
869            // Spawn a new task
870            let handle = task::spawn(async move {
871                // Transmit payloads through this task's ganglion
872                for i in 0..payloads_per_thread {
873                    // Create a unique payload for this thread and iteration
874                    let payload_id = thread_id * payloads_per_thread + i;
875                    let debug_struct = Arc::new(DebugStruct {
876                        foo: payload_id as i32,
877                        bar: format!("thread_{thread_id}_payload_{i}"),
878                    });
879
880                    let correlation_uuid = Uuid::now_v7();
881                    let payload = Payload::builder()
882                        .value((*debug_struct).clone())
883                        .correlation_id(correlation_uuid)
884                        .neuron(neuron_clone.clone())
885                        .build()
886                        .unwrap();
887
888                    // Add a small delay to increase the chance of thread interleaving
889                    sleep(Duration::from_millis(1)).await;
890
891                    // Transmit the payload through the shared ganglion
892                    let mut ganglion_guard = ganglion.lock().await;
893                    let erased_payload = erase_payload(payload);
894                    let _ = ganglion_guard.transmit(erased_payload).await;
895                }
896            });
897
898            handles.push(handle);
899        }
900
901        // Wait for all transmitter tasks to complete
902        for handle in handles {
903            handle.await.unwrap();
904        }
905
906        // Wait for the receiver task to complete and get the received payloads
907        let received_payloads = receiver_handle.await.unwrap();
908
909        // Verify that we received the expected number of payloads
910        assert_eq!(
911            shared_state
912                .received_count
913                .load(std::sync::atomic::Ordering::SeqCst),
914            total_payloads * 2,
915            "Should have received all payloads on both reactants"
916        );
917
918        // Verify that we received payloads from all threads
919        let mut foo_values = received_payloads
920            .iter()
921            .map(|p| p.value.foo)
922            .collect::<Vec<_>>();
923        foo_values.sort();
924        foo_values.dedup();
925
926        assert_eq!(
927            foo_values.len(),
928            total_payloads,
929            "Should have received payloads with all expected foo values"
930        );
931
932        // Check that the foo values match the expected range
933        for i in 0..total_payloads {
934            assert!(
935                foo_values.contains(&(i as i32)),
936                "Should have received a payload with foo={i}"
937            );
938        }
939
940        // Verify that all correlation_ids are preserved
941        let mut correlation_ids = received_payloads
942            .iter()
943            .map(|p| p.correlation_id())
944            .collect::<Vec<_>>();
945
946        // Each correlation_id should appear exactly twice (once from each reactant)
947        // So we should have total_payloads unique correlation_ids
948        correlation_ids.sort();
949
950        // Count occurrences of each correlation_id
951        let mut correlation_id_counts = std::collections::HashMap::new();
952        for id in &correlation_ids {
953            *correlation_id_counts.entry(*id).or_insert(0) += 1;
954        }
955
956        // Verify we have the expected number of unique correlation_ids
957        assert_eq!(
958            correlation_id_counts.len(),
959            total_payloads,
960            "Should have received payloads with all expected correlation_ids"
961        );
962
963        // Verify each correlation_id appears exactly twice (once from each reactant)
964        for (id, count) in correlation_id_counts {
965            assert_eq!(
966                count, 2,
967                "Correlation ID {id} should appear exactly twice (once from each reactant)"
968            );
969        }
970    }
971
972    #[tokio::test]
973    async fn test_ganglion_external_unique_id() {
974        use crate::test_utils::GanglionExternalInprocess;
975        let ganglion1 = GanglionExternalInprocess::new();
976        let ganglion2 = GanglionExternalInprocess::new();
977
978        // Each ganglion should have a unique ID
979        assert_ne!(ganglion1.unique_id(), ganglion2.unique_id());
980
981        // The same ganglion should return the same ID consistently
982        assert_eq!(ganglion1.unique_id(), ganglion1.unique_id());
983    }
984
985    #[tokio::test]
986    async fn test_ganglion_inprocess_capable_with_relevant_neurons() {
987        // Create neurons
988        let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
989            NamespaceImpl {
990                delimiter: ".",
991                parts: vec!["dev", "plexo", "1"],
992            },
993        )));
994        let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
995            NamespaceImpl {
996                delimiter: ".",
997                parts: vec!["dev", "plexo", "2"],
998            },
999        )));
1000        // Create ganglion with only neuron1 in relevant_neurons
1001        let mut relevant_neurons = HashSet::new();
1002        relevant_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
1003        let mut ganglion = GanglionInprocess::new_with_filters(relevant_neurons, HashSet::new());
1004
1005        // Test that neuron1 is capable
1006        assert!(ganglion.capable(neuron1.clone()));
1007
1008        // Test that neuron2 is not capable
1009        assert!(!ganglion.capable(neuron2.clone()));
1010    }
1011
1012    #[tokio::test]
1013    async fn test_ganglion_inprocess_capable_with_ignored_neurons() {
1014        // Create neurons
1015        let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1016            NamespaceImpl {
1017                delimiter: ".",
1018                parts: vec!["dev", "plexo", "1"],
1019            },
1020        )));
1021        let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1022            NamespaceImpl {
1023                delimiter: ".",
1024                parts: vec!["dev", "plexo", "2"],
1025            },
1026        )));
1027
1028        // Create ganglion with neuron1 in ignored_neurons
1029        let mut ignored_neurons = HashSet::new();
1030        ignored_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
1031        let mut ganglion = GanglionInprocess::new_with_filters(HashSet::new(), ignored_neurons);
1032
1033        // Test that neuron1 is not capable (ignored)
1034        assert!(!ganglion.capable(neuron1.clone()));
1035
1036        // Test that neuron2 is capable (not ignored)
1037        assert!(ganglion.capable(neuron2.clone()));
1038    }
1039
1040    #[tokio::test]
1041    async fn test_ganglion_external_inprocess_capable_with_relevant_neurons() {
1042        // Create neurons
1043        let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1044            NamespaceImpl {
1045                delimiter: ".",
1046                parts: vec!["dev", "plexo", "1"],
1047            },
1048        )));
1049        let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1050            NamespaceImpl {
1051                delimiter: ".",
1052                parts: vec!["dev", "plexo", "2"],
1053            },
1054        )));
1055
1056        // Create ganglion with only neuron1 in relevant_neurons
1057        let mut relevant_neurons = HashSet::new();
1058        relevant_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
1059        let mut ganglion =
1060            GanglionExternalInprocess::new_with_filters(relevant_neurons, HashSet::new());
1061
1062        // Test that neuron1 is capable
1063        assert!(ganglion.capable(neuron1.clone()));
1064
1065        // Test that neuron2 is not capable
1066        assert!(!ganglion.capable(neuron2.clone()));
1067    }
1068
1069    #[tokio::test]
1070    async fn test_ganglion_external_inprocess_capable_with_ignored_neurons() {
1071        // Create neurons
1072        let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1073            NamespaceImpl {
1074                delimiter: ".",
1075                parts: vec!["dev", "plexo", "1"],
1076            },
1077        )));
1078        let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1079            NamespaceImpl {
1080                delimiter: ".",
1081                parts: vec!["dev", "plexo", "2"],
1082            },
1083        )));
1084
1085        // Create ganglion with neuron1 in ignored_neurons
1086        let mut ignored_neurons = HashSet::new();
1087        ignored_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
1088        let mut ganglion =
1089            GanglionExternalInprocess::new_with_filters(HashSet::new(), ignored_neurons);
1090
1091        // Test that neuron1 is not capable (ignored)
1092        assert!(!ganglion.capable(neuron1.clone()));
1093
1094        // Test that neuron2 is capable (not ignored)
1095        assert!(ganglion.capable(neuron2.clone()));
1096    }
1097
1098    #[tokio::test]
1099    async fn test_ganglion_inprocess_capable_default_behavior() {
1100        // Create neuron
1101        let neuron = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1102            NamespaceImpl {
1103                delimiter: ".",
1104                parts: vec!["dev", "plexo"],
1105            },
1106        )));
1107
1108        // Create ganglion with default constructor (no filters)
1109        let mut ganglion = GanglionInprocess::new();
1110
1111        // Test that neuron is capable (default behavior should accept all)
1112        assert!(ganglion.capable(neuron.clone()));
1113    }
1114
1115    #[tokio::test]
1116    async fn test_ganglion_external_inprocess_capable_default_behavior() {
1117        // Create neuron
1118        let neuron = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(test_namespace()));
1119
1120        // Create ganglion with default constructor (no filters)
1121        let mut ganglion = GanglionExternalInprocess::new();
1122
1123        // Test that neuron is capable (default behavior should accept all)
1124        assert!(ganglion.capable(neuron.clone()));
1125    }
1126
1127    #[tokio::test]
1128    async fn test_ganglion_external_transmit_encoded() {
1129        let ns = test_namespace();
1130        let mut ganglion = GanglionExternalInprocess::new();
1131        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1132        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1133            Arc::new(neuron_impl.clone());
1134
1135        // Adapt the neuron to the ganglion first
1136        ganglion
1137            .adapt(neuron.clone())
1138            .await
1139            .expect("Failed to adapt neuron");
1140
1141        let debug_struct_value = DebugStruct {
1142            foo: 42,
1143            bar: "test_value".to_owned(),
1144        };
1145        let debug_struct_arc = Arc::new(debug_struct_value);
1146        let correlation_id = Uuid::now_v7();
1147        let encoded = neuron_impl
1148            .encode(debug_struct_arc.as_ref())
1149            .expect("Encoding should succeed in test");
1150
1151        let payload_raw =
1152            PayloadRaw::with_correlation(encoded, neuron.clone(), Some(correlation_id));
1153
1154        let erased_payload = erase_payload_raw(payload_raw);
1155        let result = ganglion
1156            .transmit_encoded(erased_payload)
1157            .await
1158            .expect("Failed to transmit encoded payload");
1159
1160        // For the test implementation, we expect empty vectors since no reactants were added
1161        assert_eq!(result.0.len(), 0);
1162        assert_eq!(result.1.len(), 0);
1163    }
1164
1165    #[tokio::test]
1166    async fn test_ganglion_external_adapt() {
1167        let mut ganglion = GanglionExternalInprocess::new();
1168        let ns = test_namespace();
1169        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1170        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1171            Arc::new(neuron_impl);
1172
1173        // This should complete without error
1174        ganglion
1175            .adapt(neuron)
1176            .await
1177            .expect("Failed to adapt neuron");
1178    }
1179
1180    #[tokio::test]
1181    async fn test_ganglion_external_inprocess_transmit_via_adapt() {
1182        let ns = test_namespace();
1183
1184        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
1185        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
1186        let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
1187        let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
1188
1189        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1190        let neuron_arc = neuron_impl.clone_to_arc();
1191
1192        let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1193            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1194                sender: tx1.clone(),
1195            })),
1196            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1197                sender: tx2.clone(),
1198            })),
1199        ];
1200
1201        let raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1202            erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1203                sender: raw_tx1.clone(),
1204            })),
1205            erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1206                sender: raw_tx2.clone(),
1207            })),
1208        ];
1209
1210        let mut ganglion: GanglionExternalInprocess = GanglionExternalInprocess::new();
1211
1212        ganglion
1213            .adapt(neuron_arc.clone())
1214            .await
1215            .expect("Failed to adapt neuron");
1216        ganglion
1217            .react(neuron_arc.name(), reactants, raw_reactants, vec![])
1218            .await
1219            .expect("Failed to react");
1220
1221        let debug_struct_arc = Arc::new(DebugStruct {
1222            foo: 123,
1223            bar: "ganglion_external_test_payload_1".to_string(),
1224        });
1225        let correlation_uuid1 = Uuid::now_v7();
1226        let encoded = neuron_impl
1227            .encode(debug_struct_arc.as_ref())
1228            .expect("Encoding should succeed in test");
1229        let payload_raw1 = PayloadRaw::with_correlation(
1230            encoded,
1231            neuron_arc.clone(),
1232            Some(correlation_uuid1),
1233        );
1234
1235        let erased_payload1 = erase_payload_raw(payload_raw1);
1236        ganglion
1237            .transmit_encoded(erased_payload1)
1238            .await
1239            .expect("Failed to transmit encoded payload1");
1240
1241        // Check that raw reactants received the payload
1242        let received_raw_p1_ch1 =
1243            tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx1.recv())
1244                .await
1245                .expect("Timeout raw_rx1")
1246                .expect("Closed raw_rx1");
1247        assert_eq!(
1248            received_raw_p1_ch1.correlation_id(), correlation_uuid1,
1249            "Raw correlation ID mismatch for reactant 1"
1250        );
1251
1252        let received_raw_p1_ch2 =
1253            tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx2.recv())
1254                .await
1255                .expect("Timeout raw_rx2")
1256                .expect("Closed raw_rx2");
1257        assert_eq!(
1258            received_raw_p1_ch2.correlation_id(), correlation_uuid1,
1259            "Raw correlation ID mismatch for reactant 2"
1260        );
1261
1262        // Check that regular reactants also received the decoded payload
1263        let received_p1_ch1 =
1264            tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1265                .await
1266                .expect("Timeout rx1")
1267                .expect("Closed rx1");
1268        assert_eq!(
1269            received_p1_ch1.value, debug_struct_arc,
1270            "Payload value mismatch for reactant 1"
1271        );
1272        assert_eq!(
1273            received_p1_ch1.correlation_id(), correlation_uuid1,
1274            "Correlation ID mismatch for reactant 1"
1275        );
1276
1277        let received_p1_ch2 =
1278            tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1279                .await
1280                .expect("Timeout rx2")
1281                .expect("Closed rx2");
1282        assert_eq!(
1283            received_p1_ch2.value, debug_struct_arc,
1284            "Payload value mismatch for reactant 2"
1285        );
1286        assert_eq!(
1287            received_p1_ch2.correlation_id(), correlation_uuid1,
1288            "Correlation ID mismatch for reactant 2"
1289        );
1290
1291        // Send a second payload
1292        let debug_struct_arc_2 = Arc::new(DebugStruct {
1293            foo: 456,
1294            bar: "ganglion_external_test_payload_2".to_string(),
1295        });
1296        let correlation_uuid2 = Uuid::now_v7();
1297        let encoded2 = neuron_impl
1298            .encode(debug_struct_arc_2.as_ref())
1299            .expect("Encoding should succeed in test");
1300        let payload_raw2 = PayloadRaw::with_correlation(
1301            encoded2,
1302            neuron_arc.clone(),
1303            Some(correlation_uuid2),
1304        );
1305
1306        let erased_payload2 = erase_payload_raw(payload_raw2);
1307        ganglion
1308            .transmit_encoded(erased_payload2)
1309            .await
1310            .expect("Failed to transmit encoded payload2");
1311
1312        // Check raw reactants received the second payload
1313        let received_raw_p2_ch1 =
1314            tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx1.recv())
1315                .await
1316                .expect("Timeout raw_rx1_2")
1317                .expect("Closed raw_rx1_2");
1318        assert_eq!(
1319            received_raw_p2_ch1.correlation_id(), correlation_uuid2,
1320            "Second raw correlation ID mismatch for reactant 1"
1321        );
1322
1323        let received_raw_p2_ch2 =
1324            tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx2.recv())
1325                .await
1326                .expect("Timeout raw_rx2_2")
1327                .expect("Closed raw_rx2_2");
1328        assert_eq!(
1329            received_raw_p2_ch2.correlation_id(), correlation_uuid2,
1330            "Second raw correlation ID mismatch for reactant 2"
1331        );
1332
1333        // Check regular reactants received the second decoded payload
1334        let received_p2_ch1 =
1335            tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1336                .await
1337                .expect("Timeout rx1_2")
1338                .expect("Closed rx1_2");
1339        assert_eq!(
1340            received_p2_ch1.value, debug_struct_arc_2,
1341            "Second payload value mismatch for reactant 1"
1342        );
1343        assert_eq!(
1344            received_p2_ch1.correlation_id(), correlation_uuid2,
1345            "Second correlation ID mismatch for reactant 1"
1346        );
1347
1348        let received_p2_ch2 =
1349            tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1350                .await
1351                .expect("Timeout rx2_2")
1352                .expect("Closed rx2_2");
1353        assert_eq!(
1354            received_p2_ch2.value, debug_struct_arc_2,
1355            "Second payload value mismatch for reactant 2"
1356        );
1357        assert_eq!(
1358            received_p2_ch2.correlation_id(), correlation_uuid2,
1359            "Second correlation ID mismatch for reactant 2"
1360        );
1361
1362        // Ensure channels are empty now, indicating no unexpected messages
1363        assert_eq!(
1364            rx1.len(),
1365            0,
1366            "Reactant 1 channel should be empty after all expected messages"
1367        );
1368        assert_eq!(
1369            rx2.len(),
1370            0,
1371            "Reactant 2 channel should be empty after all expected messages"
1372        );
1373        assert_eq!(
1374            raw_rx1.len(),
1375            0,
1376            "Raw reactant 1 channel should be empty after all expected messages"
1377        );
1378        assert_eq!(
1379            raw_rx2.len(),
1380            0,
1381            "Raw reactant 2 channel should be empty after all expected messages"
1382        );
1383    }
1384
1385    #[tokio::test]
1386    async fn test_ganglion_inprocess_adapt_erased() {
1387        let ns = test_namespace();
1388
1389        // Create a neuron
1390        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1391        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1392            Arc::new(neuron_impl);
1393
1394        // Create a ganglion
1395        let mut ganglion = GanglionInprocess::new();
1396
1397        // Use the regular adapt method instead of adapt_erased
1398        ganglion
1399            .adapt(neuron.clone())
1400            .await
1401            .expect("Failed to adapt neuron");
1402
1403        // Verify that a synapse was created and stored
1404        let neuron_name = neuron.name();
1405        let synapse = ganglion.get_synapse_by_name(&neuron_name);
1406        assert!(synapse.is_some());
1407    }
1408
1409    #[tokio::test]
1410    async fn test_ganglion_external_inprocess_across_threads() {
1411        use crate::ganglion::GanglionExternal;
1412        use crate::neuron::NeuronImpl;
1413        use crate::payload::PayloadRaw;
1414        use crate::test_utils::{
1415            DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant,
1416            TokioMpscReactantRaw, test_namespace,
1417        };
1418        use std::sync::Arc;
1419        use tokio::sync::mpsc::channel;
1420        use tokio::task;
1421        use tokio::time::{Duration, sleep};
1422        use uuid::Uuid;
1423
1424        // Create a struct to hold the shared state
1425        struct SharedState {
1426            // Channels to receive payloads from reactants
1427            tx1: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
1428            tx2: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
1429            raw_tx1: tokio::sync::mpsc::Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
1430            raw_tx2: tokio::sync::mpsc::Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
1431            // Counter for received payloads
1432            received_count: std::sync::atomic::AtomicUsize,
1433            raw_received_count: std::sync::atomic::AtomicUsize,
1434        }
1435
1436        // Create channels with large buffer to avoid blocking
1437        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
1438        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
1439        let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(100);
1440        let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(100);
1441
1442        // Create shared state
1443        let shared_state = Arc::new(SharedState {
1444            tx1,
1445            tx2,
1446            raw_tx1,
1447            raw_tx2,
1448            received_count: std::sync::atomic::AtomicUsize::new(0),
1449            raw_received_count: std::sync::atomic::AtomicUsize::new(0),
1450        });
1451
1452        // Number of threads and payloads per thread
1453        let num_threads = 10;
1454        let payloads_per_thread = 10;
1455        let total_payloads = num_threads * payloads_per_thread;
1456
1457        // Create a vector to store all task handles
1458        let mut handles = Vec::new();
1459
1460        // Spawn a task to receive payloads and count them
1461        let receiver_state = shared_state.clone();
1462        let receiver_handle = task::spawn(async move {
1463            let mut received_payloads = Vec::new();
1464            let mut received_raw_payloads = Vec::new();
1465
1466            // Collect payloads from all channels
1467            for _ in 0..total_payloads * 4 {
1468                // 2 regular + 2 raw channels
1469                tokio::select! {
1470                    Some(payload) = rx1.recv() => {
1471                        received_payloads.push(payload);
1472                        receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1473                    }
1474                    Some(payload) = rx2.recv() => {
1475                        received_payloads.push(payload);
1476                        receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1477                    }
1478                    Some(payload) = raw_rx1.recv() => {
1479                        received_raw_payloads.push(payload);
1480                        receiver_state.raw_received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1481                    }
1482                    Some(payload) = raw_rx2.recv() => {
1483                        received_raw_payloads.push(payload);
1484                        receiver_state.raw_received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1485                    }
1486                }
1487            }
1488
1489            (received_payloads, received_raw_payloads)
1490        });
1491
1492        // Create shared namespace, neuron, and ganglion outside the tasks
1493        let ns = test_namespace();
1494        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1495        let neuron = neuron_impl.clone_to_arc();
1496
1497        // Create a single shared ganglion with reactants
1498        let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1499            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1500                sender: shared_state.tx1.clone(),
1501            })),
1502            erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1503                sender: shared_state.tx2.clone(),
1504            })),
1505        ];
1506
1507        let raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1508            erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1509                sender: shared_state.raw_tx1.clone(),
1510            })),
1511            erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1512                sender: shared_state.raw_tx2.clone(),
1513            })),
1514        ];
1515
1516        let mut shared_ganglion = GanglionExternalInprocess::new();
1517        shared_ganglion
1518            .adapt(neuron.clone())
1519            .await
1520            .expect("Failed to adapt neuron");
1521        shared_ganglion
1522            .react(neuron.name(), reactants, raw_reactants, vec![])
1523            .await
1524            .expect("Failed to react");
1525
1526        let shared_ganglion = Arc::new(tokio::sync::Mutex::new(shared_ganglion));
1527
1528        // Spawn multiple tasks that will transmit payloads using the shared ganglion
1529        for thread_id in 0..num_threads {
1530            let ganglion = shared_ganglion.clone();
1531            let neuron_clone = neuron.clone();
1532            let neuron_impl_clone = neuron_impl.clone();
1533
1534            // Spawn a new task
1535            let handle = task::spawn(async move {
1536                // Transmit payloads through this task's ganglion
1537                for i in 0..payloads_per_thread {
1538                    // Create a unique payload for this thread and iteration
1539                    let payload_id = thread_id * payloads_per_thread + i;
1540                    let debug_struct = Arc::new(DebugStruct {
1541                        foo: payload_id as i32,
1542                        bar: format!("external_thread_{thread_id}_payload_{i}"),
1543                    });
1544
1545                    let correlation_uuid = Uuid::now_v7();
1546                    let encoded = neuron_impl_clone
1547                        .encode(debug_struct.as_ref())
1548                        .expect("Encoding should succeed in test");
1549                    let payload_raw = PayloadRaw::with_correlation(
1550                        encoded,
1551                        neuron_clone.clone(),
1552                        Some(correlation_uuid),
1553                    );
1554
1555                    // Add a small delay to increase the chance of thread interleaving
1556                    sleep(Duration::from_millis(1)).await;
1557
1558                    // Transmit the payload through the shared ganglion
1559                    let mut ganglion_guard = ganglion.lock().await;
1560                    let erased_payload = erase_payload_raw(payload_raw);
1561                    ganglion_guard
1562                        .transmit_encoded(erased_payload)
1563                        .await
1564                        .expect("Failed to transmit encoded payload");
1565                }
1566            });
1567
1568            handles.push(handle);
1569        }
1570
1571        // Wait for all transmitter tasks to complete
1572        for handle in handles {
1573            handle.await.unwrap();
1574        }
1575
1576        // Wait for the receiver task to complete and get the received payloads
1577        let (received_payloads, received_raw_payloads) = receiver_handle.await.unwrap();
1578
1579        // Verify that we received the expected number of payloads
1580        assert_eq!(
1581            shared_state
1582                .received_count
1583                .load(std::sync::atomic::Ordering::SeqCst),
1584            total_payloads * 2,
1585            "Should have received all decoded payloads on both regular reactants"
1586        );
1587
1588        assert_eq!(
1589            shared_state
1590                .raw_received_count
1591                .load(std::sync::atomic::Ordering::SeqCst),
1592            total_payloads * 2,
1593            "Should have received all raw payloads on both raw reactants"
1594        );
1595
1596        // Verify that we received payloads from all threads (regular reactants)
1597        let mut foo_values = received_payloads
1598            .iter()
1599            .map(|p| p.value.foo)
1600            .collect::<Vec<_>>();
1601        foo_values.sort();
1602        foo_values.dedup();
1603
1604        assert_eq!(
1605            foo_values.len(),
1606            total_payloads,
1607            "Should have received payloads with all expected foo values"
1608        );
1609
1610        // Check that the foo values match the expected range
1611        for i in 0..total_payloads {
1612            assert!(
1613                foo_values.contains(&(i as i32)),
1614                "Should have received a payload with foo={i}"
1615            );
1616        }
1617
1618        // Verify that all correlation_ids are preserved (regular reactants)
1619        let mut correlation_ids = received_payloads
1620            .iter()
1621            .map(|p| p.correlation_id())
1622            .collect::<Vec<_>>();
1623
1624        // Each correlation_id should appear exactly twice (once from each reactant)
1625        // So we should have total_payloads unique correlation_ids
1626        correlation_ids.sort();
1627
1628        // Count occurrences of each correlation_id
1629        let mut correlation_id_counts = std::collections::HashMap::new();
1630        for id in &correlation_ids {
1631            *correlation_id_counts.entry(*id).or_insert(0) += 1;
1632        }
1633
1634        // Verify we have the expected number of unique correlation_ids (regular reactants)
1635        assert_eq!(
1636            correlation_id_counts.len(),
1637            total_payloads,
1638            "Should have received payloads with all expected correlation_ids"
1639        );
1640
1641        // Verify each correlation_id appears exactly twice (once from each reactant)
1642        for (id, count) in correlation_id_counts {
1643            assert_eq!(
1644                count, 2,
1645                "Correlation ID {id} should appear exactly twice (once from each regular reactant)"
1646            );
1647        }
1648
1649        // Verify that all correlation_ids are preserved (raw reactants)
1650        let mut raw_correlation_ids = received_raw_payloads
1651            .iter()
1652            .map(|p| p.correlation_id())
1653            .collect::<Vec<_>>();
1654
1655        raw_correlation_ids.sort();
1656
1657        // Count occurrences of each correlation_id for raw reactants
1658        let mut raw_correlation_id_counts = std::collections::HashMap::new();
1659        for id in &raw_correlation_ids {
1660            *raw_correlation_id_counts.entry(*id).or_insert(0) += 1;
1661        }
1662
1663        // Verify we have the expected number of unique correlation_ids for raw reactants
1664        assert_eq!(
1665            raw_correlation_id_counts.len(),
1666            total_payloads,
1667            "Should have received raw payloads with all expected correlation_ids"
1668        );
1669
1670        // Verify each correlation_id appears exactly twice for raw reactants (once from each raw reactant)
1671        for (id, count) in raw_correlation_id_counts {
1672            assert_eq!(
1673                count, 2,
1674                "Correlation ID {id} should appear exactly twice (once from each raw reactant)"
1675            );
1676        }
1677    }
1678}