1use crate::codec::{Codec, CodecName};
8use crate::erasure::payload::{PayloadErased, PayloadRawErased};
9use crate::erasure::reactant::{ErrorReactantErased, ReactantErased, ReactantRawErased};
10use crate::erasure::synapse::{SynapseInternalErased, erase_synapse_internal};
11use crate::logging::LogTrace;
12use crate::neuron::{Neuron, NeuronError};
13use crate::reactant::Reactant;
14use crate::synapse::SynapseInprocess;
15use crate::utils::struct_name_of_type;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::{Arc, RwLock};
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::Instrument;
24use uuid::Uuid;
25
26#[derive(Error, Debug)]
27pub enum GanglionError {
28 #[error("Synapse not found: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
29 SynapseNotFound {
30 neuron_name: String,
31 ganglion_name: String,
32 ganglion_id: Uuid,
33 },
34 #[error(
35 "Failed to acquire lock on synapse: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
36 )]
37 SynapseLock {
38 neuron_name: String,
39 ganglion_name: String,
40 ganglion_id: Uuid,
41 },
42 #[error(
43 "Transmission error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id}): {message}"
44 )]
45 Transmit {
46 neuron_name: String,
47 ganglion_name: String,
48 ganglion_id: Uuid,
49 message: String,
50 },
51 #[error("Encode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
52 Encode {
53 neuron_name: String,
54 ganglion_name: String,
55 ganglion_id: Uuid,
56 },
57 #[error("Decode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
58 Decode {
59 neuron_name: String,
60 ganglion_name: String,
61 ganglion_id: Uuid,
62 },
63 #[error(
64 "Adaptation error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
65 )]
66 Adapt {
67 neuron_name: String,
68 ganglion_name: String,
69 ganglion_id: Uuid,
70 },
71 #[error("Queue full for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
72 QueueFull {
73 neuron_name: String,
74 ganglion_name: String,
75 ganglion_id: Uuid,
76 },
77}
78
79impl GanglionError {
80 pub fn from_neuron_error(
82 neuron_error: NeuronError,
83 ganglion_name: String,
84 ganglion_id: Uuid,
85 ) -> Self {
86 match neuron_error {
87 NeuronError::Encode { neuron_name, .. } => GanglionError::Encode {
88 neuron_name,
89 ganglion_name,
90 ganglion_id,
91 },
92 NeuronError::Decode { neuron_name, .. } => GanglionError::Decode {
93 neuron_name,
94 ganglion_name,
95 ganglion_id,
96 },
97 }
98 }
99
100 pub fn from_plexus_error(
102 plexus_error: crate::plexus::PlexusError,
103 neuron_name: String,
104 ganglion_name: String,
105 ganglion_id: Uuid,
106 ) -> Self {
107 match plexus_error {
108 crate::plexus::PlexusError::Ganglion(ganglion_error) => ganglion_error,
110 _ => GanglionError::Adapt {
112 neuron_name,
113 ganglion_name,
114 ganglion_id,
115 },
116 }
117 }
118}
119
120pub trait Ganglion {
121 fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
122 where
123 C: Codec<T> + CodecName + Send + Sync + 'static,
124 T: Send + Sync + 'static;
125
126 fn adapt<T, C>(
127 &mut self,
128 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
129 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
130 where
131 C: Codec<T> + CodecName + Send + Sync + 'static,
132 T: Send + Sync + 'static;
133}
134
135pub trait GanglionInternal {
136 fn transmit(
137 &mut self,
138 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
139 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>>;
140
141 fn react(
142 &mut self,
143 neuron_name: String,
144 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
145 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
146 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>;
147
148 fn unique_id(&self) -> Uuid;
149}
150
151pub trait GanglionExternal {
152 fn transmit(
153 &mut self,
154 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
155 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>>;
156
157 #[allow(clippy::type_complexity)]
158 fn transmit_encoded(
159 &mut self,
160 payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
161 ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), GanglionError>> + Send + 'static>>;
162
163 fn react(
164 &mut self,
165 neuron_name: String,
166 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
167 raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
168 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
169 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>;
170
171 fn unique_id(&self) -> Uuid;
172}
173
174pub async fn adapt_all<T, C, G>(
178 ganglia: &[Arc<Mutex<G>>],
179 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
180) -> Result<(), GanglionError>
181where
182 C: Codec<T> + CodecName + Send + Sync + 'static,
183 T: Send + Sync + 'static,
184 G: Ganglion + ?Sized,
185{
186 for ganglion_mutex in ganglia {
187 let mut ganglion = ganglion_mutex.lock().await;
188 ganglion.adapt(neuron.clone()).await?;
189 }
190 Ok(())
191}
192
193#[macro_export]
201macro_rules! adapt_all {
202 ($neuron:expr, $($ganglion:expr),+ $(,)?) => {
203 async {
204 $(
205 $ganglion.lock().await.adapt($neuron.clone()).await?;
206 )+
207 Result::<(), $crate::ganglion::GanglionError>::Ok(())
208 }
209 };
210}
211
212pub struct GanglionInprocess {
213 id: Uuid,
214 synapses_by_name:
215 HashMap<String, Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>>,
216 relevant_neurons: HashSet<String>,
218 ignored_neurons: HashSet<String>,
220}
221
222impl GanglionInprocess {
223 pub fn new() -> Self {
224 Self {
225 id: Uuid::now_v7(),
226 synapses_by_name: HashMap::new(),
227 relevant_neurons: HashSet::new(),
228 ignored_neurons: HashSet::new(),
229 }
230 }
231
232 pub fn new_shared() -> Arc<Mutex<Self>> {
234 Arc::new(Mutex::new(Self::new()))
235 }
236
237 pub fn new_with_filters(
238 relevant_neurons: HashSet<String>,
239 ignored_neurons: HashSet<String>,
240 ) -> Self {
241 Self {
242 id: Uuid::now_v7(),
243 synapses_by_name: HashMap::new(),
244 relevant_neurons,
245 ignored_neurons,
246 }
247 }
248
249 pub fn create_synapse<T, C>(
250 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
251 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
252 error_reactants: Vec<Arc<dyn crate::reactant::ErrorReactant<T, C> + Send + Sync>>,
253 ) -> SynapseInprocess<T, C>
254 where
255 T: Send + Sync + 'static,
256 C: Codec<T> + CodecName + Send + Sync + 'static,
257 SynapseInprocess<T, C>: Send + Sync + 'static,
258 {
259 SynapseInprocess::<T, C>::new(neuron, reactants, error_reactants)
260 }
261
262 fn get_synapse_by_name(
263 &self,
264 name: &str,
265 ) -> Option<Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>> {
266 self.synapses_by_name.get(name).cloned()
267 }
268}
269
270impl Default for GanglionInprocess {
271 fn default() -> Self {
272 Self::new()
273 }
274}
275
276impl Ganglion for GanglionInprocess {
277 fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
278 where
279 C: Codec<T> + CodecName + Send + Sync + 'static,
280 T: Send + Sync + 'static,
281 {
282 let neuron_name = neuron.name();
283
284 if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
285 return false;
286 }
287
288 if !self.ignored_neurons.is_empty() && self.ignored_neurons.contains(&neuron_name) {
289 return false;
290 }
291
292 true
293 }
294
295 fn adapt<T, C>(
296 &mut self,
297 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
298 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
299 where
300 C: Codec<T> + CodecName + Send + Sync + 'static,
301 T: Send + Sync + 'static,
302 {
303 if !self.capable(neuron.clone()) {
305 return Box::pin(async move {
306 Ok(()) });
308 }
309
310 let neuron_name = neuron.name();
311
312 if self.synapses_by_name.contains_key(&neuron_name) {
314 return Box::pin(async move {
315 Ok(()) });
317 }
318
319 let synapse = Self::create_synapse(neuron.clone(), vec![], vec![]);
321 let erased_synapse = erase_synapse_internal(synapse);
322
323 self.synapses_by_name
325 .insert(neuron_name.clone(), erased_synapse);
326
327 Box::pin(async move { Ok(()) })
328 }
329}
330
331impl GanglionInternal for GanglionInprocess {
332 fn transmit(
333 &mut self,
334 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
335 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
336 let neuron_name = payload.get_neuron_name();
337 tracing::debug!("GanglionInprocess::transmit called for neuron: {neuron_name}");
338
339 if let Some(synapse_lock) = self.get_synapse_by_name(&neuron_name) {
340 tracing::debug!("GanglionInprocess::transmit found synapse, acquiring read lock");
341 match synapse_lock.read() {
342 Ok(synapse_guard) => {
343 tracing::debug!(
344 "GanglionInprocess::transmit acquired read lock, calling transmit_erased"
345 );
346 let future = synapse_guard.transmit_erased(payload.clone());
347 let ganglion_id = self.id;
348 let ganglion_name = struct_name_of_type::<Self>().to_string();
349 Box::pin(
350 async move {
351 tracing::debug!(
352 "GanglionInprocess::transmit awaiting transmit_erased future"
353 );
354 let result = future.await;
355 tracing::debug!("GanglionInprocess::transmit transmit_erased completed");
356 result.map_err(|e| match e {
357 crate::synapse::SynapseError::QueueFull { neuron_name: _ } => {
358 GanglionError::QueueFull {
359 neuron_name: neuron_name.clone(),
360 ganglion_name: ganglion_name.clone(),
361 ganglion_id,
362 }
363 }
364 _ => GanglionError::Transmit {
365 neuron_name: neuron_name.clone(),
366 ganglion_name: ganglion_name.clone(),
367 ganglion_id,
368 message: e.to_string(),
369 },
370 })
371 }
372 .instrument(payload.span_debug("GanglionInprocess::transmit")),
373 )
374 }
375 Err(_) => {
376 tracing::debug!("GanglionInprocess::transmit failed to acquire read lock");
377 let ganglion_id = self.id;
378 let ganglion_name = struct_name_of_type::<Self>().to_string();
379 Box::pin(async move {
380 Err(GanglionError::SynapseLock {
381 neuron_name,
382 ganglion_name,
383 ganglion_id,
384 })
385 })
386 }
387 }
388 } else {
389 tracing::debug!("GanglionInprocess::transmit synapse not found");
390 let ganglion_id = self.id;
391 let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
392 Box::pin(async move {
393 Err(GanglionError::SynapseNotFound {
394 neuron_name,
395 ganglion_name,
396 ganglion_id,
397 })
398 })
399 }
400 }
401
402 fn react(
403 &mut self,
404 neuron_name: String,
405 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
406 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
407 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
408 let synapse_lock_opt = self.get_synapse_by_name(&neuron_name);
410
411 if synapse_lock_opt.is_none() {
413 let ganglion_id = self.id;
414 let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
415 return Box::pin(async move {
416 Err(GanglionError::SynapseNotFound {
417 neuron_name,
418 ganglion_name,
419 ganglion_id,
420 })
421 });
422 }
423
424 let synapse_lock = synapse_lock_opt.unwrap();
426 let mut synapse_guard = match synapse_lock.write() {
427 Ok(guard) => guard,
428 Err(_) => {
429 let ganglion_id = self.id;
430 let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
431 return Box::pin(async move {
432 Err(GanglionError::SynapseLock {
433 neuron_name,
434 ganglion_name,
435 ganglion_id,
436 })
437 });
438 }
439 };
440
441 synapse_guard.react_erased(reactants, error_reactants);
443
444 Box::pin(async move { Ok(()) })
447 }
448
449 fn unique_id(&self) -> Uuid {
450 self.id
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use crate::erasure::payload::{erase_payload, erase_payload_raw};
458 use crate::erasure::reactant::{
459 ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
460 };
461 use crate::namespace::NamespaceImpl;
462 use crate::neuron::{Neuron, NeuronImpl};
463 use crate::payload::{Payload, PayloadRaw};
464 use crate::test_utils::{
465 DebugCodec, DebugStruct, GanglionExternalInprocess, PingCodec, PingMsg, PingNeuron,
466 TokioMpscReactant, TokioMpscReactantGeneric, TokioMpscReactantRaw, test_namespace,
467 };
468 use std::collections::HashSet;
469 use std::sync::Arc;
470 use std::time::Duration;
471 use tokio::sync::mpsc::channel;
472 use tokio::task;
473 use tokio::time::sleep;
474 use uuid::Uuid;
475
476 #[test]
477 fn test_ganglion_error_with_ganglion_name() {
478 let ganglion_id = Uuid::now_v7();
479 let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
480
481 let synapse_not_found = GanglionError::SynapseNotFound {
483 neuron_name: "test_neuron".to_string(),
484 ganglion_name: ganglion_name.clone(),
485 ganglion_id,
486 };
487 assert!(synapse_not_found.to_string().contains("test_neuron"));
488 assert!(synapse_not_found.to_string().contains("GanglionInprocess"));
489
490 let synapse_lock_error = GanglionError::SynapseLock {
492 neuron_name: "test_neuron".to_string(),
493 ganglion_name: ganglion_name.clone(),
494 ganglion_id,
495 };
496 assert!(synapse_lock_error.to_string().contains("test_neuron"));
497 assert!(synapse_lock_error.to_string().contains("GanglionInprocess"));
498
499 let transmit_error = GanglionError::Transmit {
501 neuron_name: "test_neuron".to_string(),
502 ganglion_name: ganglion_name.clone(),
503 ganglion_id,
504 message: "test failure".to_string(),
505 };
506 assert!(transmit_error.to_string().contains("test_neuron"));
507 assert!(transmit_error.to_string().contains("GanglionInprocess"));
508 assert!(transmit_error.to_string().contains("test failure"));
509 }
510 #[tokio::test]
513 async fn test_ganglion_inprocess_get_synapse_by_name() {
514 let ns = test_namespace();
515 let neuron_impl_instance: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
516 let neuron_name_str = neuron_impl_instance.name();
517
518 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
519 Arc::new(neuron_impl_instance);
520
521 let mut ganglion = GanglionInprocess::new();
522
523 ganglion
525 .adapt(neuron.clone())
526 .await
527 .expect("Failed to adapt neuron for test");
528
529 let result_by_name1 = ganglion.get_synapse_by_name(&neuron_name_str);
530 assert!(result_by_name1.is_some());
531 let synapse_by_name1 = result_by_name1.unwrap();
532
533 let result_by_name2 = ganglion.get_synapse_by_name(&neuron_name_str);
534 assert!(result_by_name2.is_some());
535 let synapse_by_name2 = result_by_name2.unwrap();
536
537 assert!(
538 Arc::ptr_eq(&synapse_by_name1, &synapse_by_name2),
539 "Repeated calls to get_synapse_by_name should yield the same Arc instance."
540 );
541
542 let non_existent_result = ganglion.get_synapse_by_name("non_existent_neuron");
543 assert!(non_existent_result.is_none());
544 }
545
546 #[tokio::test]
547 async fn test_ganglion_inprocess_transmit_via_adapt() {
548 let ns = test_namespace();
549
550 let (tx1, mut rx1) = channel::<Arc<Payload<PingMsg, PingCodec>>>(10);
551 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
552
553 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
554
555 let neuron_arc = neuron_impl.clone_to_arc();
556
557 let mut ganglion: GanglionInprocess = GanglionInprocess::new();
558
559 ganglion
561 .adapt(neuron_arc.clone())
562 .await
563 .expect("Failed to adapt neuron");
564
565 let ping_neuron = Arc::new(PingNeuron::new(test_namespace()));
566 ganglion
567 .adapt(ping_neuron.clone())
568 .await
569 .expect("Failed to adapt ping neuron");
570
571 let reactants1: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> =
572 vec![erase_reactant::<PingMsg, PingCodec, _>(Box::new(
573 TokioMpscReactantGeneric::new(tx1.clone()),
574 ))];
575 let reactants2: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> =
576 vec![erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(
577 TokioMpscReactantGeneric::new(tx2.clone()),
578 ))];
579
580 ganglion
581 .react(ping_neuron.name(), reactants1, vec![])
582 .await
583 .expect("Failed to react ping");
584 ganglion
585 .react(neuron_arc.name(), reactants2, vec![])
586 .await
587 .expect("Failed to react debug");
588
589 let correlation_uuid1 = Uuid::now_v7();
590 let payload1 = Payload::builder()
591 .value(PingMsg { seq: 1 })
592 .correlation_id(correlation_uuid1)
593 .neuron(ping_neuron)
594 .build()
595 .unwrap();
596
597 let erased_payload1 = erase_payload(payload1);
598 ganglion
599 .transmit(erased_payload1)
600 .await
601 .expect("Failed to transmit payload1");
602
603 assert_eq!(
604 rx1.len(),
605 1,
606 "Reactant 1 should have received the first message (ping)"
607 );
608 let received_p1_ch1 = rx1.recv().await.unwrap();
609 assert_eq!(
610 received_p1_ch1.value.seq, 1,
611 "Payload value mismatch for reactant 1"
612 );
613 assert_eq!(
614 received_p1_ch1.correlation_id(), correlation_uuid1,
615 "Correlation ID mismatch for reactant 1"
616 );
617
618 let debug_struct_arc_2 = Arc::new(DebugStruct {
619 foo: 456,
620 bar: "ganglion_test_payload_2".to_string(),
621 });
622 let correlation_uuid2 = Uuid::now_v7();
623 let payload2 = Payload::builder()
624 .value((*debug_struct_arc_2).clone())
625 .correlation_id(correlation_uuid2)
626 .neuron(neuron_arc.clone())
627 .build()
628 .unwrap();
629
630 let erased_payload2 = erase_payload(payload2);
631 ganglion
632 .transmit(erased_payload2)
633 .await
634 .expect("Failed to transmit payload2");
635
636 assert_eq!(
637 rx2.len(),
638 1,
639 "Reactant 2 should have received the second message (debug)"
640 );
641 let received_p2_ch1 = rx2.recv().await.unwrap();
642 assert_eq!(
643 received_p2_ch1.value, debug_struct_arc_2,
644 "Second payload value mismatch for reactant 2"
645 );
646 assert_eq!(
647 received_p2_ch1.correlation_id(), correlation_uuid2,
648 "Second correlation ID mismatch for reactant 2"
649 );
650
651 assert_eq!(
653 rx1.len(),
654 0,
655 "Reactant 1 channel should be empty after all expected messages"
656 );
657 assert_eq!(
658 rx2.len(),
659 0,
660 "Reactant 2 channel should be empty after all expected messages"
661 );
662 }
663
664 #[tokio::test]
665 async fn test_ganglion_inprocess_across_threads() {
666 struct SharedState {
668 tx1: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
670 tx2: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
671 received_count: std::sync::atomic::AtomicUsize,
673 }
674
675 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
677 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
678
679 let shared_state = Arc::new(SharedState {
681 tx1,
682 tx2,
683 received_count: std::sync::atomic::AtomicUsize::new(0),
684 });
685
686 let num_threads = 10;
688 let payloads_per_thread = 10;
689 let total_payloads = num_threads * payloads_per_thread;
690
691 let mut handles = Vec::new();
693
694 let receiver_state = shared_state.clone();
696 let receiver_handle = task::spawn(async move {
697 let mut received_payloads = Vec::new();
698
699 for _ in 0..total_payloads * 2 {
701 tokio::select! {
702 Some(payload) = rx1.recv() => {
703 received_payloads.push(payload);
704 receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
705 }
706 Some(payload) = rx2.recv() => {
707 received_payloads.push(payload);
708 receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
709 }
710 }
711 }
712
713 received_payloads
714 });
715
716 let ns = test_namespace();
718 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
719 let neuron = neuron_impl.clone_to_arc();
720
721 let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
722 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
723 sender: shared_state.tx1.clone(),
724 })),
725 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
726 sender: shared_state.tx2.clone(),
727 })),
728 ];
729
730 let mut shared_ganglion = GanglionInprocess::new();
731 shared_ganglion
733 .adapt(neuron.clone())
734 .await
735 .expect("Failed to adapt neuron");
736 shared_ganglion
737 .react(neuron.name(), reactants, vec![])
738 .await
739 .expect("Failed to react");
740
741 let shared_ganglion = Arc::new(tokio::sync::Mutex::new(shared_ganglion));
742
743 for thread_id in 0..num_threads {
745 let ganglion = shared_ganglion.clone();
746 let neuron_clone = neuron.clone();
747
748 let handle = task::spawn(async move {
750 for i in 0..payloads_per_thread {
752 let payload_id = thread_id * payloads_per_thread + i;
754 let debug_struct = Arc::new(DebugStruct {
755 foo: payload_id as i32,
756 bar: format!("thread_{thread_id}_payload_{i}"),
757 });
758
759 let correlation_uuid = Uuid::now_v7();
760 let payload = Payload::builder()
761 .value((*debug_struct).clone())
762 .correlation_id(correlation_uuid)
763 .neuron(neuron_clone.clone())
764 .build()
765 .unwrap();
766
767 sleep(Duration::from_millis(1)).await;
769
770 let mut ganglion_guard = ganglion.lock().await;
772 let erased_payload = erase_payload(payload);
773 let _ = ganglion_guard.transmit(erased_payload).await;
774 }
775 });
776
777 handles.push(handle);
778 }
779
780 for handle in handles {
782 handle.await.unwrap();
783 }
784
785 let received_payloads = receiver_handle.await.unwrap();
787
788 assert_eq!(
790 shared_state
791 .received_count
792 .load(std::sync::atomic::Ordering::SeqCst),
793 total_payloads * 2,
794 "Should have received all payloads on both reactants"
795 );
796
797 let mut foo_values = received_payloads
799 .iter()
800 .map(|p| p.value.foo)
801 .collect::<Vec<_>>();
802 foo_values.sort();
803 foo_values.dedup();
804
805 assert_eq!(
806 foo_values.len(),
807 total_payloads,
808 "Should have received payloads with all expected foo values"
809 );
810
811 for i in 0..total_payloads {
813 assert!(
814 foo_values.contains(&(i as i32)),
815 "Should have received a payload with foo={i}"
816 );
817 }
818
819 let mut correlation_ids = received_payloads
821 .iter()
822 .map(|p| p.correlation_id())
823 .collect::<Vec<_>>();
824
825 correlation_ids.sort();
828
829 let mut correlation_id_counts = std::collections::HashMap::new();
831 for id in &correlation_ids {
832 *correlation_id_counts.entry(*id).or_insert(0) += 1;
833 }
834
835 assert_eq!(
837 correlation_id_counts.len(),
838 total_payloads,
839 "Should have received payloads with all expected correlation_ids"
840 );
841
842 for (id, count) in correlation_id_counts {
844 assert_eq!(
845 count, 2,
846 "Correlation ID {id} should appear exactly twice (once from each reactant)"
847 );
848 }
849 }
850
851 #[tokio::test]
852 async fn test_ganglion_external_unique_id() {
853 use crate::test_utils::GanglionExternalInprocess;
854 let ganglion1 = GanglionExternalInprocess::new();
855 let ganglion2 = GanglionExternalInprocess::new();
856
857 assert_ne!(ganglion1.unique_id(), ganglion2.unique_id());
859
860 assert_eq!(ganglion1.unique_id(), ganglion1.unique_id());
862 }
863
864 #[tokio::test]
865 async fn test_ganglion_inprocess_capable_with_relevant_neurons() {
866 let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
868 NamespaceImpl {
869 delimiter: ".",
870 parts: vec!["dev", "plexo", "1"],
871 },
872 )));
873 let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
874 NamespaceImpl {
875 delimiter: ".",
876 parts: vec!["dev", "plexo", "2"],
877 },
878 )));
879 let mut relevant_neurons = HashSet::new();
881 relevant_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
882 let mut ganglion = GanglionInprocess::new_with_filters(relevant_neurons, HashSet::new());
883
884 assert!(ganglion.capable(neuron1.clone()));
886
887 assert!(!ganglion.capable(neuron2.clone()));
889 }
890
891 #[tokio::test]
892 async fn test_ganglion_inprocess_capable_with_ignored_neurons() {
893 let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
895 NamespaceImpl {
896 delimiter: ".",
897 parts: vec!["dev", "plexo", "1"],
898 },
899 )));
900 let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
901 NamespaceImpl {
902 delimiter: ".",
903 parts: vec!["dev", "plexo", "2"],
904 },
905 )));
906
907 let mut ignored_neurons = HashSet::new();
909 ignored_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
910 let mut ganglion = GanglionInprocess::new_with_filters(HashSet::new(), ignored_neurons);
911
912 assert!(!ganglion.capable(neuron1.clone()));
914
915 assert!(ganglion.capable(neuron2.clone()));
917 }
918
919 #[tokio::test]
920 async fn test_ganglion_external_inprocess_capable_with_relevant_neurons() {
921 let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
923 NamespaceImpl {
924 delimiter: ".",
925 parts: vec!["dev", "plexo", "1"],
926 },
927 )));
928 let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
929 NamespaceImpl {
930 delimiter: ".",
931 parts: vec!["dev", "plexo", "2"],
932 },
933 )));
934
935 let mut relevant_neurons = HashSet::new();
937 relevant_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
938 let mut ganglion =
939 GanglionExternalInprocess::new_with_filters(relevant_neurons, HashSet::new());
940
941 assert!(ganglion.capable(neuron1.clone()));
943
944 assert!(!ganglion.capable(neuron2.clone()));
946 }
947
948 #[tokio::test]
949 async fn test_ganglion_external_inprocess_capable_with_ignored_neurons() {
950 let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
952 NamespaceImpl {
953 delimiter: ".",
954 parts: vec!["dev", "plexo", "1"],
955 },
956 )));
957 let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
958 NamespaceImpl {
959 delimiter: ".",
960 parts: vec!["dev", "plexo", "2"],
961 },
962 )));
963
964 let mut ignored_neurons = HashSet::new();
966 ignored_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
967 let mut ganglion =
968 GanglionExternalInprocess::new_with_filters(HashSet::new(), ignored_neurons);
969
970 assert!(!ganglion.capable(neuron1.clone()));
972
973 assert!(ganglion.capable(neuron2.clone()));
975 }
976
977 #[tokio::test]
978 async fn test_ganglion_inprocess_capable_default_behavior() {
979 let neuron = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
981 NamespaceImpl {
982 delimiter: ".",
983 parts: vec!["dev", "plexo"],
984 },
985 )));
986
987 let mut ganglion = GanglionInprocess::new();
989
990 assert!(ganglion.capable(neuron.clone()));
992 }
993
994 #[tokio::test]
995 async fn test_ganglion_external_inprocess_capable_default_behavior() {
996 let neuron = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(test_namespace()));
998
999 let mut ganglion = GanglionExternalInprocess::new();
1001
1002 assert!(ganglion.capable(neuron.clone()));
1004 }
1005
1006 #[tokio::test]
1007 async fn test_ganglion_external_transmit_encoded() {
1008 let ns = test_namespace();
1009 let mut ganglion = GanglionExternalInprocess::new();
1010 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1011 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1012 Arc::new(neuron_impl.clone());
1013
1014 ganglion
1016 .adapt(neuron.clone())
1017 .await
1018 .expect("Failed to adapt neuron");
1019
1020 let debug_struct_value = DebugStruct {
1021 foo: 42,
1022 bar: "test_value".to_owned(),
1023 };
1024 let debug_struct_arc = Arc::new(debug_struct_value);
1025 let correlation_id = Uuid::now_v7();
1026 let encoded = neuron_impl
1027 .encode(debug_struct_arc.as_ref())
1028 .expect("Encoding should succeed in test");
1029
1030 let payload_raw =
1031 PayloadRaw::with_correlation(encoded, Some(neuron.clone()), Some(correlation_id));
1032
1033 let erased_payload = erase_payload_raw(payload_raw);
1034 let result = ganglion
1035 .transmit_encoded(erased_payload)
1036 .await
1037 .expect("Failed to transmit encoded payload");
1038
1039 assert_eq!(result.0.len(), 0);
1041 assert_eq!(result.1.len(), 0);
1042 }
1043
1044 #[tokio::test]
1045 async fn test_ganglion_external_adapt() {
1046 let mut ganglion = GanglionExternalInprocess::new();
1047 let ns = test_namespace();
1048 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1049 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1050 Arc::new(neuron_impl);
1051
1052 ganglion
1054 .adapt(neuron)
1055 .await
1056 .expect("Failed to adapt neuron");
1057 }
1058
1059 #[tokio::test]
1060 async fn test_ganglion_external_inprocess_transmit_via_adapt() {
1061 let ns = test_namespace();
1062
1063 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
1064 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
1065 let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
1066 let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
1067
1068 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1069 let neuron_arc = neuron_impl.clone_to_arc();
1070
1071 let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1072 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1073 sender: tx1.clone(),
1074 })),
1075 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1076 sender: tx2.clone(),
1077 })),
1078 ];
1079
1080 let raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1081 erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1082 sender: raw_tx1.clone(),
1083 })),
1084 erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1085 sender: raw_tx2.clone(),
1086 })),
1087 ];
1088
1089 let mut ganglion: GanglionExternalInprocess = GanglionExternalInprocess::new();
1090
1091 ganglion
1092 .adapt(neuron_arc.clone())
1093 .await
1094 .expect("Failed to adapt neuron");
1095 ganglion
1096 .react(neuron_arc.name(), reactants, raw_reactants, vec![])
1097 .await
1098 .expect("Failed to react");
1099
1100 let debug_struct_arc = Arc::new(DebugStruct {
1101 foo: 123,
1102 bar: "ganglion_external_test_payload_1".to_string(),
1103 });
1104 let correlation_uuid1 = Uuid::now_v7();
1105 let encoded = neuron_impl
1106 .encode(debug_struct_arc.as_ref())
1107 .expect("Encoding should succeed in test");
1108 let payload_raw1 = PayloadRaw::with_correlation(
1109 encoded,
1110 Some(neuron_arc.clone()),
1111 Some(correlation_uuid1),
1112 );
1113
1114 let erased_payload1 = erase_payload_raw(payload_raw1);
1115 ganglion
1116 .transmit_encoded(erased_payload1)
1117 .await
1118 .expect("Failed to transmit encoded payload1");
1119
1120 let received_raw_p1_ch1 =
1122 tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx1.recv())
1123 .await
1124 .expect("Timeout raw_rx1")
1125 .expect("Closed raw_rx1");
1126 assert_eq!(
1127 received_raw_p1_ch1.correlation_id(), correlation_uuid1,
1128 "Raw correlation ID mismatch for reactant 1"
1129 );
1130
1131 let received_raw_p1_ch2 =
1132 tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx2.recv())
1133 .await
1134 .expect("Timeout raw_rx2")
1135 .expect("Closed raw_rx2");
1136 assert_eq!(
1137 received_raw_p1_ch2.correlation_id(), correlation_uuid1,
1138 "Raw correlation ID mismatch for reactant 2"
1139 );
1140
1141 let received_p1_ch1 =
1143 tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1144 .await
1145 .expect("Timeout rx1")
1146 .expect("Closed rx1");
1147 assert_eq!(
1148 received_p1_ch1.value, debug_struct_arc,
1149 "Payload value mismatch for reactant 1"
1150 );
1151 assert_eq!(
1152 received_p1_ch1.correlation_id(), correlation_uuid1,
1153 "Correlation ID mismatch for reactant 1"
1154 );
1155
1156 let received_p1_ch2 =
1157 tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1158 .await
1159 .expect("Timeout rx2")
1160 .expect("Closed rx2");
1161 assert_eq!(
1162 received_p1_ch2.value, debug_struct_arc,
1163 "Payload value mismatch for reactant 2"
1164 );
1165 assert_eq!(
1166 received_p1_ch2.correlation_id(), correlation_uuid1,
1167 "Correlation ID mismatch for reactant 2"
1168 );
1169
1170 let debug_struct_arc_2 = Arc::new(DebugStruct {
1172 foo: 456,
1173 bar: "ganglion_external_test_payload_2".to_string(),
1174 });
1175 let correlation_uuid2 = Uuid::now_v7();
1176 let encoded2 = neuron_impl
1177 .encode(debug_struct_arc_2.as_ref())
1178 .expect("Encoding should succeed in test");
1179 let payload_raw2 = PayloadRaw::with_correlation(
1180 encoded2,
1181 Some(neuron_arc.clone()),
1182 Some(correlation_uuid2),
1183 );
1184
1185 let erased_payload2 = erase_payload_raw(payload_raw2);
1186 ganglion
1187 .transmit_encoded(erased_payload2)
1188 .await
1189 .expect("Failed to transmit encoded payload2");
1190
1191 let received_raw_p2_ch1 =
1193 tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx1.recv())
1194 .await
1195 .expect("Timeout raw_rx1_2")
1196 .expect("Closed raw_rx1_2");
1197 assert_eq!(
1198 received_raw_p2_ch1.correlation_id(), correlation_uuid2,
1199 "Second raw correlation ID mismatch for reactant 1"
1200 );
1201
1202 let received_raw_p2_ch2 =
1203 tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx2.recv())
1204 .await
1205 .expect("Timeout raw_rx2_2")
1206 .expect("Closed raw_rx2_2");
1207 assert_eq!(
1208 received_raw_p2_ch2.correlation_id(), correlation_uuid2,
1209 "Second raw correlation ID mismatch for reactant 2"
1210 );
1211
1212 let received_p2_ch1 =
1214 tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1215 .await
1216 .expect("Timeout rx1_2")
1217 .expect("Closed rx1_2");
1218 assert_eq!(
1219 received_p2_ch1.value, debug_struct_arc_2,
1220 "Second payload value mismatch for reactant 1"
1221 );
1222 assert_eq!(
1223 received_p2_ch1.correlation_id(), correlation_uuid2,
1224 "Second correlation ID mismatch for reactant 1"
1225 );
1226
1227 let received_p2_ch2 =
1228 tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1229 .await
1230 .expect("Timeout rx2_2")
1231 .expect("Closed rx2_2");
1232 assert_eq!(
1233 received_p2_ch2.value, debug_struct_arc_2,
1234 "Second payload value mismatch for reactant 2"
1235 );
1236 assert_eq!(
1237 received_p2_ch2.correlation_id(), correlation_uuid2,
1238 "Second correlation ID mismatch for reactant 2"
1239 );
1240
1241 assert_eq!(
1243 rx1.len(),
1244 0,
1245 "Reactant 1 channel should be empty after all expected messages"
1246 );
1247 assert_eq!(
1248 rx2.len(),
1249 0,
1250 "Reactant 2 channel should be empty after all expected messages"
1251 );
1252 assert_eq!(
1253 raw_rx1.len(),
1254 0,
1255 "Raw reactant 1 channel should be empty after all expected messages"
1256 );
1257 assert_eq!(
1258 raw_rx2.len(),
1259 0,
1260 "Raw reactant 2 channel should be empty after all expected messages"
1261 );
1262 }
1263
1264 #[tokio::test]
1265 async fn test_ganglion_inprocess_adapt_erased() {
1266 let ns = test_namespace();
1267
1268 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1270 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
1271 Arc::new(neuron_impl);
1272
1273 let mut ganglion = GanglionInprocess::new();
1275
1276 ganglion
1278 .adapt(neuron.clone())
1279 .await
1280 .expect("Failed to adapt neuron");
1281
1282 let neuron_name = neuron.name();
1284 let synapse = ganglion.get_synapse_by_name(&neuron_name);
1285 assert!(synapse.is_some());
1286 }
1287
1288 #[tokio::test]
1289 async fn test_ganglion_external_inprocess_across_threads() {
1290 use crate::ganglion::GanglionExternal;
1291 use crate::neuron::NeuronImpl;
1292 use crate::payload::PayloadRaw;
1293 use crate::test_utils::{
1294 DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant,
1295 TokioMpscReactantRaw, test_namespace,
1296 };
1297 use std::sync::Arc;
1298 use tokio::sync::mpsc::channel;
1299 use tokio::task;
1300 use tokio::time::{Duration, sleep};
1301 use uuid::Uuid;
1302
1303 struct SharedState {
1305 tx1: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
1307 tx2: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
1308 raw_tx1: tokio::sync::mpsc::Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
1309 raw_tx2: tokio::sync::mpsc::Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
1310 received_count: std::sync::atomic::AtomicUsize,
1312 raw_received_count: std::sync::atomic::AtomicUsize,
1313 }
1314
1315 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
1317 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
1318 let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(100);
1319 let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(100);
1320
1321 let shared_state = Arc::new(SharedState {
1323 tx1,
1324 tx2,
1325 raw_tx1,
1326 raw_tx2,
1327 received_count: std::sync::atomic::AtomicUsize::new(0),
1328 raw_received_count: std::sync::atomic::AtomicUsize::new(0),
1329 });
1330
1331 let num_threads = 10;
1333 let payloads_per_thread = 10;
1334 let total_payloads = num_threads * payloads_per_thread;
1335
1336 let mut handles = Vec::new();
1338
1339 let receiver_state = shared_state.clone();
1341 let receiver_handle = task::spawn(async move {
1342 let mut received_payloads = Vec::new();
1343 let mut received_raw_payloads = Vec::new();
1344
1345 for _ in 0..total_payloads * 4 {
1347 tokio::select! {
1349 Some(payload) = rx1.recv() => {
1350 received_payloads.push(payload);
1351 receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1352 }
1353 Some(payload) = rx2.recv() => {
1354 received_payloads.push(payload);
1355 receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1356 }
1357 Some(payload) = raw_rx1.recv() => {
1358 received_raw_payloads.push(payload);
1359 receiver_state.raw_received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1360 }
1361 Some(payload) = raw_rx2.recv() => {
1362 received_raw_payloads.push(payload);
1363 receiver_state.raw_received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1364 }
1365 }
1366 }
1367
1368 (received_payloads, received_raw_payloads)
1369 });
1370
1371 let ns = test_namespace();
1373 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1374 let neuron = neuron_impl.clone_to_arc();
1375
1376 let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1378 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1379 sender: shared_state.tx1.clone(),
1380 })),
1381 erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
1382 sender: shared_state.tx2.clone(),
1383 })),
1384 ];
1385
1386 let raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1387 erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1388 sender: shared_state.raw_tx1.clone(),
1389 })),
1390 erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
1391 sender: shared_state.raw_tx2.clone(),
1392 })),
1393 ];
1394
1395 let mut shared_ganglion = GanglionExternalInprocess::new();
1396 shared_ganglion
1397 .adapt(neuron.clone())
1398 .await
1399 .expect("Failed to adapt neuron");
1400 shared_ganglion
1401 .react(neuron.name(), reactants, raw_reactants, vec![])
1402 .await
1403 .expect("Failed to react");
1404
1405 let shared_ganglion = Arc::new(tokio::sync::Mutex::new(shared_ganglion));
1406
1407 for thread_id in 0..num_threads {
1409 let ganglion = shared_ganglion.clone();
1410 let neuron_clone = neuron.clone();
1411 let neuron_impl_clone = neuron_impl.clone();
1412
1413 let handle = task::spawn(async move {
1415 for i in 0..payloads_per_thread {
1417 let payload_id = thread_id * payloads_per_thread + i;
1419 let debug_struct = Arc::new(DebugStruct {
1420 foo: payload_id as i32,
1421 bar: format!("external_thread_{thread_id}_payload_{i}"),
1422 });
1423
1424 let correlation_uuid = Uuid::now_v7();
1425 let encoded = neuron_impl_clone
1426 .encode(debug_struct.as_ref())
1427 .expect("Encoding should succeed in test");
1428 let payload_raw = PayloadRaw::with_correlation(
1429 encoded,
1430 Some(neuron_clone.clone()),
1431 Some(correlation_uuid),
1432 );
1433
1434 sleep(Duration::from_millis(1)).await;
1436
1437 let mut ganglion_guard = ganglion.lock().await;
1439 let erased_payload = erase_payload_raw(payload_raw);
1440 ganglion_guard
1441 .transmit_encoded(erased_payload)
1442 .await
1443 .expect("Failed to transmit encoded payload");
1444 }
1445 });
1446
1447 handles.push(handle);
1448 }
1449
1450 for handle in handles {
1452 handle.await.unwrap();
1453 }
1454
1455 let (received_payloads, received_raw_payloads) = receiver_handle.await.unwrap();
1457
1458 assert_eq!(
1460 shared_state
1461 .received_count
1462 .load(std::sync::atomic::Ordering::SeqCst),
1463 total_payloads * 2,
1464 "Should have received all decoded payloads on both regular reactants"
1465 );
1466
1467 assert_eq!(
1468 shared_state
1469 .raw_received_count
1470 .load(std::sync::atomic::Ordering::SeqCst),
1471 total_payloads * 2,
1472 "Should have received all raw payloads on both raw reactants"
1473 );
1474
1475 let mut foo_values = received_payloads
1477 .iter()
1478 .map(|p| p.value.foo)
1479 .collect::<Vec<_>>();
1480 foo_values.sort();
1481 foo_values.dedup();
1482
1483 assert_eq!(
1484 foo_values.len(),
1485 total_payloads,
1486 "Should have received payloads with all expected foo values"
1487 );
1488
1489 for i in 0..total_payloads {
1491 assert!(
1492 foo_values.contains(&(i as i32)),
1493 "Should have received a payload with foo={i}"
1494 );
1495 }
1496
1497 let mut correlation_ids = received_payloads
1499 .iter()
1500 .map(|p| p.correlation_id())
1501 .collect::<Vec<_>>();
1502
1503 correlation_ids.sort();
1506
1507 let mut correlation_id_counts = std::collections::HashMap::new();
1509 for id in &correlation_ids {
1510 *correlation_id_counts.entry(*id).or_insert(0) += 1;
1511 }
1512
1513 assert_eq!(
1515 correlation_id_counts.len(),
1516 total_payloads,
1517 "Should have received payloads with all expected correlation_ids"
1518 );
1519
1520 for (id, count) in correlation_id_counts {
1522 assert_eq!(
1523 count, 2,
1524 "Correlation ID {id} should appear exactly twice (once from each regular reactant)"
1525 );
1526 }
1527
1528 let mut raw_correlation_ids = received_raw_payloads
1530 .iter()
1531 .map(|p| p.correlation_id())
1532 .collect::<Vec<_>>();
1533
1534 raw_correlation_ids.sort();
1535
1536 let mut raw_correlation_id_counts = std::collections::HashMap::new();
1538 for id in &raw_correlation_ids {
1539 *raw_correlation_id_counts.entry(*id).or_insert(0) += 1;
1540 }
1541
1542 assert_eq!(
1544 raw_correlation_id_counts.len(),
1545 total_payloads,
1546 "Should have received raw payloads with all expected correlation_ids"
1547 );
1548
1549 for (id, count) in raw_correlation_id_counts {
1551 assert_eq!(
1552 count, 2,
1553 "Correlation ID {id} should appear exactly twice (once from each raw reactant)"
1554 );
1555 }
1556 }
1557}