1use crate::codec::{Codec, CodecName};
8use crate::erasure::payload::{PayloadErased, PayloadRawErased};
9use crate::erasure::reactant::{ErrorReactantErased, ReactantErased, ReactantRawErased};
10use crate::erasure::synapse::{SynapseInternalErased, erase_synapse_internal};
11use crate::logging::LogTrace;
12use crate::neuron::{Neuron, NeuronError};
13use crate::synapse::SynapseInprocess;
14use crate::utils::struct_name_of_type;
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use parking_lot::RwLock;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::Instrument;
24use uuid::Uuid;
25
26#[derive(Error, Debug)]
27pub enum GanglionError {
28 #[error("Synapse not found: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
29 SynapseNotFound {
30 neuron_name: String,
31 ganglion_name: String,
32 ganglion_id: Uuid,
33 },
34 #[error(
35 "Failed to acquire lock on synapse: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
36 )]
37 SynapseLock {
38 neuron_name: String,
39 ganglion_name: String,
40 ganglion_id: Uuid,
41 },
42 #[error(
43 "Transmission error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id}): {message}"
44 )]
45 Transmit {
46 neuron_name: String,
47 ganglion_name: String,
48 ganglion_id: Uuid,
49 message: String,
50 },
51 #[error("Encode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
52 Encode {
53 neuron_name: String,
54 ganglion_name: String,
55 ganglion_id: Uuid,
56 },
57 #[error("Decode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
58 Decode {
59 neuron_name: String,
60 ganglion_name: String,
61 ganglion_id: Uuid,
62 },
63 #[error(
64 "Adaptation error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
65 )]
66 Adapt {
67 neuron_name: String,
68 ganglion_name: String,
69 ganglion_id: Uuid,
70 },
71 #[error("Queue full for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
72 QueueFull {
73 neuron_name: String,
74 ganglion_name: String,
75 ganglion_id: Uuid,
76 },
77}
78
79impl GanglionError {
80 pub fn from_neuron_error(
82 neuron_error: NeuronError,
83 ganglion_name: String,
84 ganglion_id: Uuid,
85 ) -> Self {
86 match neuron_error {
87 NeuronError::Encode { neuron_name, .. } => GanglionError::Encode {
88 neuron_name,
89 ganglion_name,
90 ganglion_id,
91 },
92 NeuronError::Decode { neuron_name, .. } => GanglionError::Decode {
93 neuron_name,
94 ganglion_name,
95 ganglion_id,
96 },
97 }
98 }
99
100 pub fn from_plexus_error(
102 plexus_error: crate::plexus::PlexusError,
103 neuron_name: String,
104 ganglion_name: String,
105 ganglion_id: Uuid,
106 ) -> Self {
107 match plexus_error {
108 crate::plexus::PlexusError::Ganglion(ganglion_error) => ganglion_error,
110 _ => GanglionError::Adapt {
112 neuron_name,
113 ganglion_name,
114 ganglion_id,
115 },
116 }
117 }
118}
119
120pub trait Ganglion {
121 fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
122 where
123 C: Codec<T> + CodecName + Send + Sync + 'static,
124 T: Send + Sync + 'static;
125
126 fn adapt<T, C>(
127 &mut self,
128 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
129 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
130 where
131 C: Codec<T> + CodecName + Send + Sync + 'static,
132 T: Send + Sync + 'static;
133}
134
135pub trait GanglionInternal {
137 fn transmit(
138 &mut self,
139 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
140 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>>;
141
142 fn react(
143 &mut self,
144 neuron_name: String,
145 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
146 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
147 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>;
148
149 fn react_many(
151 &mut self,
152 reactions: HashMap<
153 String,
154 (
155 Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
156 Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
157 ),
158 >,
159 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
160 let mut futures = Vec::new();
161 for (name, (rs, ers)) in reactions {
162 futures.push(self.react(name, rs, ers));
163 }
164 Box::pin(async move {
165 for f in futures {
166 f.await?;
167 }
168 Ok(())
169 })
170 }
171
172 fn unique_id(&self) -> Uuid;
173}
174
175pub trait GanglionExternal {
177 fn transmit(
178 &mut self,
179 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
180 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>>;
181
182 #[allow(clippy::type_complexity)]
183 fn transmit_encoded(
184 &mut self,
185 payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
186 ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), GanglionError>> + Send + 'static>>;
187
188 fn react(
189 &mut self,
190 neuron_name: String,
191 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
192 raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
193 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
194 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>;
195
196 fn react_many(
198 &mut self,
199 reactions: HashMap<
200 String,
201 (
202 Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
203 Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
204 Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
205 ),
206 >,
207 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
208 let mut futures = Vec::new();
209 for (name, (rs, rrs, ers)) in reactions {
210 futures.push(self.react(name, rs, rrs, ers));
211 }
212 Box::pin(async move {
213 for f in futures {
214 f.await?;
215 }
216 Ok(())
217 })
218 }
219
220 fn unique_id(&self) -> Uuid;
221}
222
223pub async fn adapt_all<T, C, G>(
227 ganglia: &[Arc<Mutex<G>>],
228 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
229) -> Result<(), GanglionError>
230where
231 C: Codec<T> + CodecName + Send + Sync + 'static,
232 T: Send + Sync + 'static,
233 G: Ganglion + ?Sized,
234{
235 for ganglion_mutex in ganglia {
236 let mut ganglion = ganglion_mutex.lock().await;
237 ganglion.adapt(neuron.clone()).await?;
238 }
239 Ok(())
240}
241
242#[macro_export]
273macro_rules! adapt_all {
274 ($neuron:expr, $($ganglion:expr),+ $(,)?) => {
275 async {
276 $(
277 $ganglion.lock().await.adapt($neuron.clone()).await?;
278 )+
279 Result::<(), $crate::ganglion::GanglionError>::Ok(())
280 }
281 };
282}
283
284#[macro_export]
319macro_rules! adapt_many {
320 ([$($neuron:expr),+ $(,)?], $ganglion:expr $(,)?) => {
322 async {
323 let mut g = $ganglion.lock().await;
324 $(
325 g.adapt($neuron.clone()).await?;
326 )+
327 Result::<(), $crate::ganglion::GanglionError>::Ok(())
328 }
329 };
330
331 ([$($neuron:expr),+ $(,)?], $head_ganglion:expr, $($tail_ganglion:expr),+ $(,)?) => {
333 async {
334 {
335 let mut g = $head_ganglion.lock().await;
336 $(
337 g.adapt($neuron.clone()).await?;
338 )+
339 }
340 $crate::adapt_many!([$($neuron),+], $($tail_ganglion),+).await
341 }
342 };
343}
344
345pub struct GanglionInprocess {
346 id: Uuid,
347 synapses_by_name:
348 HashMap<String, Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>>,
349 relevant_neurons: HashSet<String>,
351 ignored_neurons: HashSet<String>,
353}
354
355impl GanglionInprocess {
356 pub fn new() -> Self {
357 Self {
358 id: Uuid::now_v7(),
359 synapses_by_name: HashMap::new(),
360 relevant_neurons: HashSet::new(),
361 ignored_neurons: HashSet::new(),
362 }
363 }
364
365 pub fn new_shared() -> Arc<Mutex<Self>> {
367 Arc::new(Mutex::new(Self::new()))
368 }
369
370 pub fn new_with_filters(
371 relevant_neurons: HashSet<String>,
372 ignored_neurons: HashSet<String>,
373 ) -> Self {
374 Self {
375 id: Uuid::now_v7(),
376 synapses_by_name: HashMap::new(),
377 relevant_neurons,
378 ignored_neurons,
379 }
380 }
381
382 fn get_synapse_by_name(
383 &self,
384 name: &str,
385 ) -> Option<Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>> {
386 self.synapses_by_name.get(name).cloned()
387 }
388}
389
390impl Default for GanglionInprocess {
391 fn default() -> Self {
392 Self::new()
393 }
394}
395
396impl Ganglion for GanglionInprocess {
397 fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
398 where
399 C: Codec<T> + CodecName + Send + Sync + 'static,
400 T: Send + Sync + 'static,
401 {
402 let neuron_name = neuron.name();
403
404 if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
405 return false;
406 }
407
408 if !self.ignored_neurons.is_empty() && self.ignored_neurons.contains(&neuron_name) {
409 return false;
410 }
411
412 true
413 }
414
415 fn adapt<T, C>(
416 &mut self,
417 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
418 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
419 where
420 C: Codec<T> + CodecName + Send + Sync + 'static,
421 T: Send + Sync + 'static,
422 {
423 if !self.capable(neuron.clone()) {
425 return Box::pin(async move {
426 Ok(()) });
428 }
429
430 let neuron_name = neuron.name();
431
432 if self.synapses_by_name.contains_key(&neuron_name) {
434 return Box::pin(async move {
435 Ok(()) });
437 }
438
439 let synapse = SynapseInprocess::<T, C>::new(neuron.clone(), vec![], vec![]);
441 let erased_synapse = erase_synapse_internal(synapse);
442
443 self.synapses_by_name
445 .insert(neuron_name.clone(), erased_synapse);
446
447 Box::pin(async move { Ok(()) })
448 }
449}
450
451impl GanglionInternal for GanglionInprocess {
452 fn transmit(
453 &mut self,
454 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
455 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
456 let neuron_name = payload.get_neuron_name();
457 tracing::debug!("GanglionInprocess::transmit called for neuron: {neuron_name}");
458
459 if let Some(synapse_lock) = self.get_synapse_by_name(&neuron_name) {
460 tracing::debug!("GanglionInprocess::transmit found synapse, acquiring read lock");
461 let synapse_guard = synapse_lock.read();
462 tracing::debug!(
463 "GanglionInprocess::transmit acquired read lock, calling transmit_erased"
464 );
465 let future = synapse_guard.transmit_erased(payload.clone());
466 let ganglion_id = self.id;
467 let ganglion_name = struct_name_of_type::<Self>().to_string();
468 Box::pin(
469 async move {
470 tracing::debug!(
471 "GanglionInprocess::transmit awaiting transmit_erased future"
472 );
473 let result = future.await;
474 tracing::debug!("GanglionInprocess::transmit transmit_erased completed");
475 result.map_err(|e| match e {
476 crate::synapse::SynapseError::QueueFull { neuron_name: _ } => {
477 GanglionError::QueueFull {
478 neuron_name: neuron_name.clone(),
479 ganglion_name: ganglion_name.clone(),
480 ganglion_id,
481 }
482 }
483 _ => GanglionError::Transmit {
484 neuron_name: neuron_name.clone(),
485 ganglion_name: ganglion_name.clone(),
486 ganglion_id,
487 message: e.to_string(),
488 },
489 })
490 }
491 .instrument(payload.span_debug("GanglionInprocess::transmit")),
492 )
493 } else {
494 tracing::debug!("GanglionInprocess::transmit synapse not found");
495 let ganglion_id = self.id;
496 let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
497 Box::pin(async move {
498 Err(GanglionError::SynapseNotFound {
499 neuron_name,
500 ganglion_name,
501 ganglion_id,
502 })
503 })
504 }
505 }
506
507 fn react(
508 &mut self,
509 neuron_name: String,
510 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
511 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
512 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
513 let synapse_lock_opt = self.get_synapse_by_name(&neuron_name);
515
516 if synapse_lock_opt.is_none() {
518 let ganglion_id = self.id;
519 let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
520 return Box::pin(async move {
521 Err(GanglionError::SynapseNotFound {
522 neuron_name,
523 ganglion_name,
524 ganglion_id,
525 })
526 });
527 }
528
529 let synapse_lock = synapse_lock_opt.unwrap();
531 let mut synapse_guard = synapse_lock.write();
532
533 synapse_guard.react_erased(reactants, error_reactants);
535
536 Box::pin(async move { Ok(()) })
539 }
540
541 fn react_many(
542 &mut self,
543 reactions: HashMap<
544 String,
545 (
546 Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
547 Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
548 ),
549 >,
550 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
551 for (neuron_name, (reactants, error_reactants)) in reactions {
552 if let Some(synapse_lock) = self.get_synapse_by_name(&neuron_name) {
553 let mut synapse_guard = synapse_lock.write();
554 synapse_guard.react_erased(reactants, error_reactants);
555 } else {
556 let ganglion_id = self.id;
557 let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
558 return Box::pin(async move {
559 Err(GanglionError::SynapseNotFound {
560 neuron_name,
561 ganglion_name,
562 ganglion_id,
563 })
564 });
565 }
566 }
567 Box::pin(async move { Ok(()) })
568 }
569
570 fn unique_id(&self) -> Uuid {
571 self.id
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use super::*;
578 use crate::erasure::payload::{erase_payload, erase_payload_raw};
579 use crate::erasure::reactant::{
580 ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
581 };
582 use crate::namespace::NamespaceImpl;
583 use crate::neuron::{Neuron, NeuronImpl};
584 use crate::payload::{Payload, PayloadRaw};
585 use crate::test_utils::{
586 DebugCodec, DebugStruct, GanglionExternalInprocess, PingCodec, PingMsg, PingNeuron,
587 TokioMpscReactant, TokioMpscReactantGeneric, TokioMpscReactantRaw, test_namespace,
588 };
589 use std::collections::HashSet;
590 use std::sync::Arc;
591 use std::time::Duration;
592 use tokio::sync::mpsc::channel;
593 use tokio::task;
594 use tokio::time::sleep;
595 use uuid::Uuid;
596
597 #[test]
598 fn test_ganglion_error_with_ganglion_name() {
599 let ganglion_id = Uuid::now_v7();
600 let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
601
602 let synapse_not_found = GanglionError::SynapseNotFound {
604 neuron_name: "test_neuron".to_string(),
605 ganglion_name: ganglion_name.clone(),
606 ganglion_id,
607 };
608 assert!(synapse_not_found.to_string().contains("test_neuron"));
609 assert!(synapse_not_found.to_string().contains("GanglionInprocess"));
610
611 let synapse_lock_error = GanglionError::SynapseLock {
613 neuron_name: "test_neuron".to_string(),
614 ganglion_name: ganglion_name.clone(),
615 ganglion_id,
616 };
617 assert!(synapse_lock_error.to_string().contains("test_neuron"));
618 assert!(synapse_lock_error.to_string().contains("GanglionInprocess"));
619
620 let transmit_error = GanglionError::Transmit {
622 neuron_name: "test_neuron".to_string(),
623 ganglion_name: ganglion_name.clone(),
624 ganglion_id,
625 message: "test failure".to_string(),
626 };
627 assert!(transmit_error.to_string().contains("test_neuron"));
628 assert!(transmit_error.to_string().contains("GanglionInprocess"));
629 assert!(transmit_error.to_string().contains("test failure"));
630 }
631 #[tokio::test]
634 async fn test_ganglion_inprocess_get_synapse_by_name() {
635 let ns = test_namespace();
636 let neuron_impl_instance: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
637 let neuron_name_str = neuron_impl_instance.name();
638
639 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
640 Arc::new(neuron_impl_instance);
641
642 let mut ganglion = GanglionInprocess::new();
643
644 ganglion
646 .adapt(neuron.clone())
647 .await
648 .expect("Failed to adapt neuron for test");
649
650 let result_by_name1 = ganglion.get_synapse_by_name(&neuron_name_str);
651 assert!(result_by_name1.is_some());
652 let synapse_by_name1 = result_by_name1.unwrap();
653
654 let result_by_name2 = ganglion.get_synapse_by_name(&neuron_name_str);
655 assert!(result_by_name2.is_some());
656 let synapse_by_name2 = result_by_name2.unwrap();
657
658 assert!(
659 Arc::ptr_eq(&synapse_by_name1, &synapse_by_name2),
660 "Repeated calls to get_synapse_by_name should yield the same Arc instance."
661 );
662
663 let non_existent_result = ganglion.get_synapse_by_name("non_existent_neuron");
664 assert!(non_existent_result.is_none());
665 }
666
667 #[tokio::test]
668 async fn test_ganglion_inprocess_transmit_via_adapt() {
669 let ns = test_namespace();
670
671 let (tx1, mut rx1) = channel::<Arc<Payload<PingMsg, PingCodec>>>(10);
672 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
673
674 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
675
676 let neuron_arc = neuron_impl.clone_to_arc();
677
678 let mut ganglion: GanglionInprocess = GanglionInprocess::new();
679
680 ganglion
682 .adapt(neuron_arc.clone())
683 .await
684 .expect("Failed to adapt neuron");
685
686 let ping_neuron = Arc::new(PingNeuron::new(test_namespace()));
687 ganglion
688 .adapt(ping_neuron.clone())
689 .await
690 .expect("Failed to adapt ping neuron");
691
692 let reactants1: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> =
693 vec![erase_reactant::<PingMsg, PingCodec, _>(Box::new(
694 TokioMpscReactantGeneric::new(tx1.clone()),
695 ))];
696 let reactants2: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> =
697 vec![erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(
698 TokioMpscReactantGeneric::new(tx2.clone()),
699 ))];
700
701 ganglion
702 .react(ping_neuron.name(), reactants1, vec![])
703 .await
704 .expect("Failed to react ping");
705 ganglion
706 .react(neuron_arc.name(), reactants2, vec![])
707 .await
708 .expect("Failed to react debug");
709
710 let correlation_uuid1 = Uuid::now_v7();
711 let payload1 = Payload::builder()
712 .value(PingMsg { seq: 1 })
713 .correlation_id(correlation_uuid1)
714 .neuron(ping_neuron)
715 .build()
716 .unwrap();
717
718 let erased_payload1 = erase_payload(payload1);
719 ganglion
720 .transmit(erased_payload1)
721 .await
722 .expect("Failed to transmit payload1");
723
724 assert_eq!(
725 rx1.len(),
726 1,
727 "Reactant 1 should have received the first message (ping)"
728 );
729 let received_p1_ch1 = rx1.recv().await.unwrap();
730 assert_eq!(
731 received_p1_ch1.value.seq, 1,
732 "Payload value mismatch for reactant 1"
733 );
734 assert_eq!(
735 received_p1_ch1.correlation_id(), correlation_uuid1,
736 "Correlation ID mismatch for reactant 1"
737 );
738
739 let debug_struct_arc_2 = Arc::new(DebugStruct {
740 foo: 456,
741 bar: "ganglion_test_payload_2".to_string(),
742 });
743 let correlation_uuid2 = Uuid::now_v7();
744 let payload2 = Payload::builder()
745 .value((*debug_struct_arc_2).clone())
746 .correlation_id(correlation_uuid2)
747 .neuron(neuron_arc.clone())
748 .build()
749 .unwrap();
750
751 let erased_payload2 = erase_payload(payload2);
752 ganglion
753 .transmit(erased_payload2)
754 .await
755 .expect("Failed to transmit payload2");
756
757 assert_eq!(
758 rx2.len(),
759 1,
760 "Reactant 2 should have received the second message (debug)"
761 );
762 let received_p2_ch1 = rx2.recv().await.unwrap();
763 assert_eq!(
764 received_p2_ch1.value, debug_struct_arc_2,
765 "Second payload value mismatch for reactant 2"
766 );
767 assert_eq!(
768 received_p2_ch1.correlation_id(), correlation_uuid2,
769 "Second correlation ID mismatch for reactant 2"
770 );
771
772 assert_eq!(
774 rx1.len(),
775 0,
776 "Reactant 1 channel should be empty after all expected messages"
777 );
778 assert_eq!(
779 rx2.len(),
780 0,
781 "Reactant 2 channel should be empty after all expected messages"
782 );
783 }
784
785 #[tokio::test]
786 async fn test_ganglion_inprocess_across_threads() {
787 struct SharedState {
789 tx1: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
791 tx2: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
792 received_count: std::sync::atomic::AtomicUsize,
794 }
795
796 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
798 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
799
800 let shared_state = Arc::new(SharedState {
802 tx1,
803 tx2,
804 received_count: std::sync::atomic::AtomicUsize::new(0),
805 });
806
807 let num_threads = 10;
809 let payloads_per_thread = 10;
810 let total_payloads = num_threads * payloads_per_thread;
811
812 let mut handles = Vec::new();
814
815 let receiver_state = shared_state.clone();
817 let receiver_handle = task::spawn(async move {
818 let mut received_payloads = Vec::new();
819
820 for _ in 0..total_payloads * 2 {
822 tokio::select! {
823 Some(payload) = rx1.recv() => {
824 received_payloads.push(payload);
825 receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
826 }
827 Some(payload) = rx2.recv() => {
828 received_payloads.push(payload);
829 receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
830 }
831 }
832 }
833
834 received_payloads
835 });
836
837 let ns = test_namespace();
839 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
840 let neuron = neuron_impl.clone_to_arc();
841
842 let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
843 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
844 sender: shared_state.tx1.clone(),
845 })),
846 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
847 sender: shared_state.tx2.clone(),
848 })),
849 ];
850
851 let mut shared_ganglion = GanglionInprocess::new();
852 shared_ganglion
854 .adapt(neuron.clone())
855 .await
856 .expect("Failed to adapt neuron");
857 shared_ganglion
858 .react(neuron.name(), reactants, vec![])
859 .await
860 .expect("Failed to react");
861
862 let shared_ganglion = Arc::new(tokio::sync::Mutex::new(shared_ganglion));
863
864 for thread_id in 0..num_threads {
866 let ganglion = shared_ganglion.clone();
867 let neuron_clone = neuron.clone();
868
869 let handle = task::spawn(async move {
871 for i in 0..payloads_per_thread {
873 let payload_id = thread_id * payloads_per_thread + i;
875 let debug_struct = Arc::new(DebugStruct {
876 foo: payload_id as i32,
877 bar: format!("thread_{thread_id}_payload_{i}"),
878 });
879
880 let correlation_uuid = Uuid::now_v7();
881 let payload = Payload::builder()
882 .value((*debug_struct).clone())
883 .correlation_id(correlation_uuid)
884 .neuron(neuron_clone.clone())
885 .build()
886 .unwrap();
887
888 sleep(Duration::from_millis(1)).await;
890
891 let mut ganglion_guard = ganglion.lock().await;
893 let erased_payload = erase_payload(payload);
894 let _ = ganglion_guard.transmit(erased_payload).await;
895 }
896 });
897
898 handles.push(handle);
899 }
900
901 for handle in handles {
903 handle.await.unwrap();
904 }
905
906 let received_payloads = receiver_handle.await.unwrap();
908
909 assert_eq!(
911 shared_state
912 .received_count
913 .load(std::sync::atomic::Ordering::SeqCst),
914 total_payloads * 2,
915 "Should have received all payloads on both reactants"
916 );
917
918 let mut foo_values = received_payloads
920 .iter()
921 .map(|p| p.value.foo)
922 .collect::<Vec<_>>();
923 foo_values.sort();
924 foo_values.dedup();
925
926 assert_eq!(
927 foo_values.len(),
928 total_payloads,
929 "Should have received payloads with all expected foo values"
930 );
931
932 for i in 0..total_payloads {
934 assert!(
935 foo_values.contains(&(i as i32)),
936 "Should have received a payload with foo={i}"
937 );
938 }
939
940 let mut correlation_ids = received_payloads
942 .iter()
943 .map(|p| p.correlation_id())
944 .collect::<Vec<_>>();
945
946 correlation_ids.sort();
949
950 let mut correlation_id_counts = std::collections::HashMap::new();
952 for id in &correlation_ids {
953 *correlation_id_counts.entry(*id).or_insert(0) += 1;
954 }
955
956 assert_eq!(
958 correlation_id_counts.len(),
959 total_payloads,
960 "Should have received payloads with all expected correlation_ids"
961 );
962
963 for (id, count) in correlation_id_counts {
965 assert_eq!(
966 count, 2,
967 "Correlation ID {id} should appear exactly twice (once from each reactant)"
968 );
969 }
970 }
971
972 #[tokio::test]
973 async fn test_ganglion_external_unique_id() {
974 use crate::test_utils::GanglionExternalInprocess;
975 let ganglion1 = GanglionExternalInprocess::new();
976 let ganglion2 = GanglionExternalInprocess::new();
977
978 assert_ne!(ganglion1.unique_id(), ganglion2.unique_id());
980
981 assert_eq!(ganglion1.unique_id(), ganglion1.unique_id());
983 }
984
985 #[tokio::test]
986 async fn test_ganglion_inprocess_capable_with_relevant_neurons() {
987 let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
989 NamespaceImpl {
990 delimiter: ".",
991 parts: vec!["dev", "plexo", "1"],
992 },
993 )));
994 let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
995 NamespaceImpl {
996 delimiter: ".",
997 parts: vec!["dev", "plexo", "2"],
998 },
999 )));
1000 let mut relevant_neurons = HashSet::new();
1002 relevant_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
1003 let mut ganglion = GanglionInprocess::new_with_filters(relevant_neurons, HashSet::new());
1004
1005 assert!(ganglion.capable(neuron1.clone()));
1007
1008 assert!(!ganglion.capable(neuron2.clone()));
1010 }
1011
1012 #[tokio::test]
1013 async fn test_ganglion_inprocess_capable_with_ignored_neurons() {
1014 let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1016 NamespaceImpl {
1017 delimiter: ".",
1018 parts: vec!["dev", "plexo", "1"],
1019 },
1020 )));
1021 let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1022 NamespaceImpl {
1023 delimiter: ".",
1024 parts: vec!["dev", "plexo", "2"],
1025 },
1026 )));
1027
1028 let mut ignored_neurons = HashSet::new();
1030 ignored_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
1031 let mut ganglion = GanglionInprocess::new_with_filters(HashSet::new(), ignored_neurons);
1032
1033 assert!(!ganglion.capable(neuron1.clone()));
1035
1036 assert!(ganglion.capable(neuron2.clone()));
1038 }
1039
1040 #[tokio::test]
1041 async fn test_ganglion_external_inprocess_capable_with_relevant_neurons() {
1042 let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1044 NamespaceImpl {
1045 delimiter: ".",
1046 parts: vec!["dev", "plexo", "1"],
1047 },
1048 )));
1049 let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1050 NamespaceImpl {
1051 delimiter: ".",
1052 parts: vec!["dev", "plexo", "2"],
1053 },
1054 )));
1055
1056 let mut relevant_neurons = HashSet::new();
1058 relevant_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
1059 let mut ganglion =
1060 GanglionExternalInprocess::new_with_filters(relevant_neurons, HashSet::new());
1061
1062 assert!(ganglion.capable(neuron1.clone()));
1064
1065 assert!(!ganglion.capable(neuron2.clone()));
1067 }
1068
1069 #[tokio::test]
1070 async fn test_ganglion_external_inprocess_capable_with_ignored_neurons() {
1071 let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1073 NamespaceImpl {
1074 delimiter: ".",
1075 parts: vec!["dev", "plexo", "1"],
1076 },
1077 )));
1078 let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1079 NamespaceImpl {
1080 delimiter: ".",
1081 parts: vec!["dev", "plexo", "2"],
1082 },
1083 )));
1084
1085 let mut ignored_neurons = HashSet::new();
1087 ignored_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
1088 let mut ganglion =
1089 GanglionExternalInprocess::new_with_filters(HashSet::new(), ignored_neurons);
1090
1091 assert!(!ganglion.capable(neuron1.clone()));
1093
1094 assert!(ganglion.capable(neuron2.clone()));
1096 }
1097
1098 #[tokio::test]
1099 async fn test_ganglion_inprocess_capable_default_behavior() {
1100 let neuron = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
1102 NamespaceImpl {
1103 delimiter: ".",
1104 parts: vec!["dev", "plexo"],
1105 },
1106 )));
1107
1108 let mut ganglion = GanglionInprocess::new();
1110
1111 assert!(ganglion.capable(neuron.clone()));
1113 }
1114
1115 #[tokio::test]
1116 async fn test_ganglion_external_inprocess_capable_default_behavior() {
1117 let neuron = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(test_namespace()));
1119
1120 let mut ganglion = GanglionExternalInprocess::new();
1122
1123 assert!(ganglion.capable(neuron.clone()));
1125 }
1126
1127 #[tokio::test]
1128 async fn test_ganglion_external_transmit_encoded() {
1129 let ns = test_namespace();
1130 let mut ganglion = GanglionExternalInprocess::new();
1131 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1132 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1133 Arc::new(neuron_impl.clone());
1134
1135 ganglion
1137 .adapt(neuron.clone())
1138 .await
1139 .expect("Failed to adapt neuron");
1140
1141 let debug_struct_value = DebugStruct {
1142 foo: 42,
1143 bar: "test_value".to_owned(),
1144 };
1145 let debug_struct_arc = Arc::new(debug_struct_value);
1146 let correlation_id = Uuid::now_v7();
1147 let encoded = neuron_impl
1148 .encode(debug_struct_arc.as_ref())
1149 .expect("Encoding should succeed in test");
1150
1151 let payload_raw =
1152 PayloadRaw::with_correlation(encoded, neuron.clone(), Some(correlation_id));
1153
1154 let erased_payload = erase_payload_raw(payload_raw);
1155 let result = ganglion
1156 .transmit_encoded(erased_payload)
1157 .await
1158 .expect("Failed to transmit encoded payload");
1159
1160 assert_eq!(result.0.len(), 0);
1162 assert_eq!(result.1.len(), 0);
1163 }
1164
1165 #[tokio::test]
1166 async fn test_ganglion_external_adapt() {
1167 let mut ganglion = GanglionExternalInprocess::new();
1168 let ns = test_namespace();
1169 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1170 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1171 Arc::new(neuron_impl);
1172
1173 ganglion
1175 .adapt(neuron)
1176 .await
1177 .expect("Failed to adapt neuron");
1178 }
1179
1180 #[tokio::test]
1181 async fn test_ganglion_external_inprocess_transmit_via_adapt() {
1182 let ns = test_namespace();
1183
1184 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
1185 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
1186 let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
1187 let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
1188
1189 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1190 let neuron_arc = neuron_impl.clone_to_arc();
1191
1192 let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1193 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1194 sender: tx1.clone(),
1195 })),
1196 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1197 sender: tx2.clone(),
1198 })),
1199 ];
1200
1201 let raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1202 erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1203 sender: raw_tx1.clone(),
1204 })),
1205 erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1206 sender: raw_tx2.clone(),
1207 })),
1208 ];
1209
1210 let mut ganglion: GanglionExternalInprocess = GanglionExternalInprocess::new();
1211
1212 ganglion
1213 .adapt(neuron_arc.clone())
1214 .await
1215 .expect("Failed to adapt neuron");
1216 ganglion
1217 .react(neuron_arc.name(), reactants, raw_reactants, vec![])
1218 .await
1219 .expect("Failed to react");
1220
1221 let debug_struct_arc = Arc::new(DebugStruct {
1222 foo: 123,
1223 bar: "ganglion_external_test_payload_1".to_string(),
1224 });
1225 let correlation_uuid1 = Uuid::now_v7();
1226 let encoded = neuron_impl
1227 .encode(debug_struct_arc.as_ref())
1228 .expect("Encoding should succeed in test");
1229 let payload_raw1 = PayloadRaw::with_correlation(
1230 encoded,
1231 neuron_arc.clone(),
1232 Some(correlation_uuid1),
1233 );
1234
1235 let erased_payload1 = erase_payload_raw(payload_raw1);
1236 ganglion
1237 .transmit_encoded(erased_payload1)
1238 .await
1239 .expect("Failed to transmit encoded payload1");
1240
1241 let received_raw_p1_ch1 =
1243 tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx1.recv())
1244 .await
1245 .expect("Timeout raw_rx1")
1246 .expect("Closed raw_rx1");
1247 assert_eq!(
1248 received_raw_p1_ch1.correlation_id(), correlation_uuid1,
1249 "Raw correlation ID mismatch for reactant 1"
1250 );
1251
1252 let received_raw_p1_ch2 =
1253 tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx2.recv())
1254 .await
1255 .expect("Timeout raw_rx2")
1256 .expect("Closed raw_rx2");
1257 assert_eq!(
1258 received_raw_p1_ch2.correlation_id(), correlation_uuid1,
1259 "Raw correlation ID mismatch for reactant 2"
1260 );
1261
1262 let received_p1_ch1 =
1264 tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1265 .await
1266 .expect("Timeout rx1")
1267 .expect("Closed rx1");
1268 assert_eq!(
1269 received_p1_ch1.value, debug_struct_arc,
1270 "Payload value mismatch for reactant 1"
1271 );
1272 assert_eq!(
1273 received_p1_ch1.correlation_id(), correlation_uuid1,
1274 "Correlation ID mismatch for reactant 1"
1275 );
1276
1277 let received_p1_ch2 =
1278 tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1279 .await
1280 .expect("Timeout rx2")
1281 .expect("Closed rx2");
1282 assert_eq!(
1283 received_p1_ch2.value, debug_struct_arc,
1284 "Payload value mismatch for reactant 2"
1285 );
1286 assert_eq!(
1287 received_p1_ch2.correlation_id(), correlation_uuid1,
1288 "Correlation ID mismatch for reactant 2"
1289 );
1290
1291 let debug_struct_arc_2 = Arc::new(DebugStruct {
1293 foo: 456,
1294 bar: "ganglion_external_test_payload_2".to_string(),
1295 });
1296 let correlation_uuid2 = Uuid::now_v7();
1297 let encoded2 = neuron_impl
1298 .encode(debug_struct_arc_2.as_ref())
1299 .expect("Encoding should succeed in test");
1300 let payload_raw2 = PayloadRaw::with_correlation(
1301 encoded2,
1302 neuron_arc.clone(),
1303 Some(correlation_uuid2),
1304 );
1305
1306 let erased_payload2 = erase_payload_raw(payload_raw2);
1307 ganglion
1308 .transmit_encoded(erased_payload2)
1309 .await
1310 .expect("Failed to transmit encoded payload2");
1311
1312 let received_raw_p2_ch1 =
1314 tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx1.recv())
1315 .await
1316 .expect("Timeout raw_rx1_2")
1317 .expect("Closed raw_rx1_2");
1318 assert_eq!(
1319 received_raw_p2_ch1.correlation_id(), correlation_uuid2,
1320 "Second raw correlation ID mismatch for reactant 1"
1321 );
1322
1323 let received_raw_p2_ch2 =
1324 tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx2.recv())
1325 .await
1326 .expect("Timeout raw_rx2_2")
1327 .expect("Closed raw_rx2_2");
1328 assert_eq!(
1329 received_raw_p2_ch2.correlation_id(), correlation_uuid2,
1330 "Second raw correlation ID mismatch for reactant 2"
1331 );
1332
1333 let received_p2_ch1 =
1335 tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1336 .await
1337 .expect("Timeout rx1_2")
1338 .expect("Closed rx1_2");
1339 assert_eq!(
1340 received_p2_ch1.value, debug_struct_arc_2,
1341 "Second payload value mismatch for reactant 1"
1342 );
1343 assert_eq!(
1344 received_p2_ch1.correlation_id(), correlation_uuid2,
1345 "Second correlation ID mismatch for reactant 1"
1346 );
1347
1348 let received_p2_ch2 =
1349 tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1350 .await
1351 .expect("Timeout rx2_2")
1352 .expect("Closed rx2_2");
1353 assert_eq!(
1354 received_p2_ch2.value, debug_struct_arc_2,
1355 "Second payload value mismatch for reactant 2"
1356 );
1357 assert_eq!(
1358 received_p2_ch2.correlation_id(), correlation_uuid2,
1359 "Second correlation ID mismatch for reactant 2"
1360 );
1361
1362 assert_eq!(
1364 rx1.len(),
1365 0,
1366 "Reactant 1 channel should be empty after all expected messages"
1367 );
1368 assert_eq!(
1369 rx2.len(),
1370 0,
1371 "Reactant 2 channel should be empty after all expected messages"
1372 );
1373 assert_eq!(
1374 raw_rx1.len(),
1375 0,
1376 "Raw reactant 1 channel should be empty after all expected messages"
1377 );
1378 assert_eq!(
1379 raw_rx2.len(),
1380 0,
1381 "Raw reactant 2 channel should be empty after all expected messages"
1382 );
1383 }
1384
1385 #[tokio::test]
1386 async fn test_ganglion_inprocess_adapt_erased() {
1387 let ns = test_namespace();
1388
1389 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1391 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1392 Arc::new(neuron_impl);
1393
1394 let mut ganglion = GanglionInprocess::new();
1396
1397 ganglion
1399 .adapt(neuron.clone())
1400 .await
1401 .expect("Failed to adapt neuron");
1402
1403 let neuron_name = neuron.name();
1405 let synapse = ganglion.get_synapse_by_name(&neuron_name);
1406 assert!(synapse.is_some());
1407 }
1408
1409 #[tokio::test]
1410 async fn test_ganglion_external_inprocess_across_threads() {
1411 use crate::ganglion::GanglionExternal;
1412 use crate::neuron::NeuronImpl;
1413 use crate::payload::PayloadRaw;
1414 use crate::test_utils::{
1415 DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant,
1416 TokioMpscReactantRaw, test_namespace,
1417 };
1418 use std::sync::Arc;
1419 use tokio::sync::mpsc::channel;
1420 use tokio::task;
1421 use tokio::time::{Duration, sleep};
1422 use uuid::Uuid;
1423
1424 struct SharedState {
1426 tx1: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
1428 tx2: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
1429 raw_tx1: tokio::sync::mpsc::Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
1430 raw_tx2: tokio::sync::mpsc::Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
1431 received_count: std::sync::atomic::AtomicUsize,
1433 raw_received_count: std::sync::atomic::AtomicUsize,
1434 }
1435
1436 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
1438 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
1439 let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(100);
1440 let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(100);
1441
1442 let shared_state = Arc::new(SharedState {
1444 tx1,
1445 tx2,
1446 raw_tx1,
1447 raw_tx2,
1448 received_count: std::sync::atomic::AtomicUsize::new(0),
1449 raw_received_count: std::sync::atomic::AtomicUsize::new(0),
1450 });
1451
1452 let num_threads = 10;
1454 let payloads_per_thread = 10;
1455 let total_payloads = num_threads * payloads_per_thread;
1456
1457 let mut handles = Vec::new();
1459
1460 let receiver_state = shared_state.clone();
1462 let receiver_handle = task::spawn(async move {
1463 let mut received_payloads = Vec::new();
1464 let mut received_raw_payloads = Vec::new();
1465
1466 for _ in 0..total_payloads * 4 {
1468 tokio::select! {
1470 Some(payload) = rx1.recv() => {
1471 received_payloads.push(payload);
1472 receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1473 }
1474 Some(payload) = rx2.recv() => {
1475 received_payloads.push(payload);
1476 receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1477 }
1478 Some(payload) = raw_rx1.recv() => {
1479 received_raw_payloads.push(payload);
1480 receiver_state.raw_received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1481 }
1482 Some(payload) = raw_rx2.recv() => {
1483 received_raw_payloads.push(payload);
1484 receiver_state.raw_received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1485 }
1486 }
1487 }
1488
1489 (received_payloads, received_raw_payloads)
1490 });
1491
1492 let ns = test_namespace();
1494 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1495 let neuron = neuron_impl.clone_to_arc();
1496
1497 let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1499 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1500 sender: shared_state.tx1.clone(),
1501 })),
1502 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1503 sender: shared_state.tx2.clone(),
1504 })),
1505 ];
1506
1507 let raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1508 erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1509 sender: shared_state.raw_tx1.clone(),
1510 })),
1511 erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1512 sender: shared_state.raw_tx2.clone(),
1513 })),
1514 ];
1515
1516 let mut shared_ganglion = GanglionExternalInprocess::new();
1517 shared_ganglion
1518 .adapt(neuron.clone())
1519 .await
1520 .expect("Failed to adapt neuron");
1521 shared_ganglion
1522 .react(neuron.name(), reactants, raw_reactants, vec![])
1523 .await
1524 .expect("Failed to react");
1525
1526 let shared_ganglion = Arc::new(tokio::sync::Mutex::new(shared_ganglion));
1527
1528 for thread_id in 0..num_threads {
1530 let ganglion = shared_ganglion.clone();
1531 let neuron_clone = neuron.clone();
1532 let neuron_impl_clone = neuron_impl.clone();
1533
1534 let handle = task::spawn(async move {
1536 for i in 0..payloads_per_thread {
1538 let payload_id = thread_id * payloads_per_thread + i;
1540 let debug_struct = Arc::new(DebugStruct {
1541 foo: payload_id as i32,
1542 bar: format!("external_thread_{thread_id}_payload_{i}"),
1543 });
1544
1545 let correlation_uuid = Uuid::now_v7();
1546 let encoded = neuron_impl_clone
1547 .encode(debug_struct.as_ref())
1548 .expect("Encoding should succeed in test");
1549 let payload_raw = PayloadRaw::with_correlation(
1550 encoded,
1551 neuron_clone.clone(),
1552 Some(correlation_uuid),
1553 );
1554
1555 sleep(Duration::from_millis(1)).await;
1557
1558 let mut ganglion_guard = ganglion.lock().await;
1560 let erased_payload = erase_payload_raw(payload_raw);
1561 ganglion_guard
1562 .transmit_encoded(erased_payload)
1563 .await
1564 .expect("Failed to transmit encoded payload");
1565 }
1566 });
1567
1568 handles.push(handle);
1569 }
1570
1571 for handle in handles {
1573 handle.await.unwrap();
1574 }
1575
1576 let (received_payloads, received_raw_payloads) = receiver_handle.await.unwrap();
1578
1579 assert_eq!(
1581 shared_state
1582 .received_count
1583 .load(std::sync::atomic::Ordering::SeqCst),
1584 total_payloads * 2,
1585 "Should have received all decoded payloads on both regular reactants"
1586 );
1587
1588 assert_eq!(
1589 shared_state
1590 .raw_received_count
1591 .load(std::sync::atomic::Ordering::SeqCst),
1592 total_payloads * 2,
1593 "Should have received all raw payloads on both raw reactants"
1594 );
1595
1596 let mut foo_values = received_payloads
1598 .iter()
1599 .map(|p| p.value.foo)
1600 .collect::<Vec<_>>();
1601 foo_values.sort();
1602 foo_values.dedup();
1603
1604 assert_eq!(
1605 foo_values.len(),
1606 total_payloads,
1607 "Should have received payloads with all expected foo values"
1608 );
1609
1610 for i in 0..total_payloads {
1612 assert!(
1613 foo_values.contains(&(i as i32)),
1614 "Should have received a payload with foo={i}"
1615 );
1616 }
1617
1618 let mut correlation_ids = received_payloads
1620 .iter()
1621 .map(|p| p.correlation_id())
1622 .collect::<Vec<_>>();
1623
1624 correlation_ids.sort();
1627
1628 let mut correlation_id_counts = std::collections::HashMap::new();
1630 for id in &correlation_ids {
1631 *correlation_id_counts.entry(*id).or_insert(0) += 1;
1632 }
1633
1634 assert_eq!(
1636 correlation_id_counts.len(),
1637 total_payloads,
1638 "Should have received payloads with all expected correlation_ids"
1639 );
1640
1641 for (id, count) in correlation_id_counts {
1643 assert_eq!(
1644 count, 2,
1645 "Correlation ID {id} should appear exactly twice (once from each regular reactant)"
1646 );
1647 }
1648
1649 let mut raw_correlation_ids = received_raw_payloads
1651 .iter()
1652 .map(|p| p.correlation_id())
1653 .collect::<Vec<_>>();
1654
1655 raw_correlation_ids.sort();
1656
1657 let mut raw_correlation_id_counts = std::collections::HashMap::new();
1659 for id in &raw_correlation_ids {
1660 *raw_correlation_id_counts.entry(*id).or_insert(0) += 1;
1661 }
1662
1663 assert_eq!(
1665 raw_correlation_id_counts.len(),
1666 total_payloads,
1667 "Should have received raw payloads with all expected correlation_ids"
1668 );
1669
1670 for (id, count) in raw_correlation_id_counts {
1672 assert_eq!(
1673 count, 2,
1674 "Correlation ID {id} should appear exactly twice (once from each raw reactant)"
1675 );
1676 }
1677 }
1678}