1pub mod reactants;
8
9use crate::axon::AxonError;
10use crate::codec::{Codec, CodecName};
11use crate::erasure::neuron::{NeuronErased, erase_neuron};
12use crate::erasure::payload::PayloadErased;
13use crate::erasure::reactant::{ErrorReactantErased, ReactantErased, ReactantRawErased};
14use crate::ganglion::{
15 Ganglion, GanglionError, GanglionExternal, GanglionInprocess, GanglionInternal,
16};
17use crate::neuron::Neuron;
18use crate::plexus::reactants::{
19 ErasedExternalInternalReactantFactory, PlexusExternalExternalReactantFactory,
20 PlexusExternalInternalReactantFactory, PlexusInternalReactantFactory,
21};
22use itertools::Itertools;
23use moka::future::Cache;
24use reactants::{ErasedExternalExternalReactantFactory, ErasedInternalReactantFactory};
25use std::collections::{HashMap, HashSet};
26use std::future::Future;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::time::Duration;
30use thiserror::Error;
31use tokio::sync::{Mutex, RwLock};
32use uuid::Uuid;
33
34#[derive(Error, Debug)]
35pub enum PlexusError {
36 #[error("Ganglion error: {0}")]
37 Ganglion(#[from] GanglionError),
38 #[error("Failed to acquire lock on external ganglia")]
39 ExternalGangliaLock,
40 #[error("Failed to acquire lock on internal ganglia")]
41 InternalGangliaLock,
42 #[error("Failed to acquire lock on neurons")]
43 NeuronsLock,
44 #[error("Failed to acquire lock on reactant factories")]
45 ReactantFactoriesLock,
46 #[error("Failed to acquire lock on neuron ganglia")]
47 NeuronGangliaLock,
48 #[error("Failed to acquire lock on reactions")]
49 ReactionsLock,
50 #[error("Neuron adaptation failed for {neuron_name}")]
51 NeuronAdaptation { neuron_name: String },
52 #[error("Reactant creation failed for {neuron_name}")]
53 ReactantCreation { neuron_name: String },
54 #[error("Transmission failed")]
55 Transmission,
56}
57
58impl From<AxonError> for PlexusError {
59 fn from(error: AxonError) -> Self {
60 match error {
61 AxonError::GanglionError(e) => PlexusError::Ganglion(e),
62 AxonError::NeuronNotAdapted {
63 neuron_name,
64 ganglion_name,
65 ganglion_id,
66 } => PlexusError::Ganglion(GanglionError::SynapseNotFound {
67 neuron_name,
68 ganglion_name,
69 ganglion_id,
70 }),
71 AxonError::SynapseLock {
72 neuron_name,
73 ganglion_name,
74 ganglion_id,
75 } => PlexusError::Ganglion(GanglionError::SynapseLock {
76 neuron_name,
77 ganglion_name,
78 ganglion_id,
79 }),
80 AxonError::Transmit {
81 neuron_name,
82 ganglion_name,
83 ganglion_id,
84 message,
85 } => PlexusError::Ganglion(GanglionError::Transmit {
86 neuron_name,
87 ganglion_name,
88 ganglion_id,
89 message,
90 }),
91 AxonError::Encode {
92 neuron_name,
93 ganglion_name,
94 ganglion_id,
95 } => PlexusError::Ganglion(GanglionError::Encode {
96 neuron_name,
97 ganglion_name,
98 ganglion_id,
99 }),
100 AxonError::Decode {
101 neuron_name,
102 ganglion_name,
103 ganglion_id,
104 } => PlexusError::Ganglion(GanglionError::Decode {
105 neuron_name,
106 ganglion_name,
107 ganglion_id,
108 }),
109 AxonError::TransmissionTimeout => PlexusError::Transmission,
110 }
111 }
112}
113
114pub struct PlexusReactantFactories {
115 pub internal_factory: Arc<dyn ErasedInternalReactantFactory + Send + Sync>,
116 pub external_internal_factory: Arc<dyn ErasedExternalInternalReactantFactory + Send + Sync>,
117 pub external_external_factory: Arc<dyn ErasedExternalExternalReactantFactory + Send + Sync>,
118}
119
120#[allow(clippy::type_complexity)]
123pub struct Plexus {
124 id: Uuid,
126
127 inproc_ganglion: Arc<Mutex<GanglionInprocess>>,
129
130 external_ganglia:
132 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
133
134 internal_ganglia:
136 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
137
138 neurons: Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
140
141 reactant_factories: Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
143
144 neuron_ganglia: Arc<RwLock<HashSet<(String, Uuid)>>>,
146
147 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
150
151 relevant_neurons: HashSet<String>,
153
154 ignored_neurons: HashSet<String>,
156}
157impl Plexus {
158 pub async fn neurons(
160 &self,
161 ) -> tokio::sync::RwLockReadGuard<
162 '_,
163 HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>,
164 > {
165 self.neurons.read().await
166 }
167
168 pub async fn new(
170 relevant_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
171 ignored_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
172 ) -> Self {
173 let inproc_ganglion = Arc::new(Mutex::new(GanglionInprocess::new()));
174
175 let relevant_neuron_names = relevant_neurons
177 .iter()
178 .map(|n| n.name())
179 .collect::<HashSet<String>>();
180
181 let ignored_neuron_names = ignored_neurons
182 .iter()
183 .map(|n| n.name())
184 .collect::<HashSet<String>>();
185
186 let reactions = Cache::builder()
187 .time_to_idle(Duration::from_secs(60)) .build();
189
190 let plexus = Self {
191 id: Uuid::now_v7(),
192 inproc_ganglion: inproc_ganglion.clone(),
193 external_ganglia: Arc::new(RwLock::new(HashMap::new())),
194 internal_ganglia: Arc::new(RwLock::new(HashMap::new())),
195 neurons: Arc::new(RwLock::new(HashMap::new())),
196 reactant_factories: Arc::new(RwLock::new(HashMap::new())),
197 neuron_ganglia: Arc::new(RwLock::new(HashSet::new())),
198 reactions,
199 relevant_neurons: relevant_neuron_names,
200 ignored_neurons: ignored_neuron_names,
201 };
202
203 let ganglion_id = {
205 let ganglion_guard = inproc_ganglion.lock().await;
206 ganglion_guard.unique_id()
207 };
208 plexus
209 .internal_ganglia
210 .write()
211 .await
212 .insert(ganglion_id, inproc_ganglion);
213
214 plexus
215 }
216
217 pub async fn new_shared(
219 relevant_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
220 ignored_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
221 ) -> Arc<Mutex<Self>> {
222 Arc::new(Mutex::new(
223 Self::new(relevant_neurons, ignored_neurons).await,
224 ))
225 }
226
227 pub async fn infuse_ganglion<G>(&mut self, ganglion: Arc<Mutex<G>>) -> Result<(), PlexusError>
229 where
230 G: GanglionInternal + Ganglion + Send + Sync + 'static,
231 {
232 let ganglion_id = {
234 let ganglion_guard = ganglion.lock().await;
235 ganglion_guard.unique_id()
236 };
237
238 self.internal_ganglia
239 .write()
240 .await
241 .insert(ganglion_id, ganglion.clone());
242
243 self.update_neuron_ganglia().await?;
244 Ok(())
245 }
246
247 pub async fn infuse_external_ganglion<G>(
249 &mut self,
250 ganglion: Arc<Mutex<G>>,
251 ) -> Result<(), PlexusError>
252 where
253 G: GanglionExternal + Send + Sync + 'static,
254 {
255 let ganglion_id = {
257 let ganglion_guard = ganglion.lock().await;
258 ganglion_guard.unique_id()
259 };
260
261 self.external_ganglia
262 .write()
263 .await
264 .insert(ganglion_id, ganglion);
265
266 self.update_neuron_ganglia().await?;
267 Ok(())
268 }
269
270 pub async fn excise_ganglion_by_id(
272 &mut self,
273 ganglion_id: Uuid,
274 ) -> Result<Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>, PlexusError> {
275 let removed = self.internal_ganglia.write().await.remove(&ganglion_id);
276
277 if removed.is_some() {
278 self.update_neuron_ganglia().await?;
279 }
280
281 Ok(removed)
282 }
283
284 pub async fn excise_ganglion<G>(
286 &mut self,
287 ganglion: Arc<Mutex<G>>,
288 ) -> Result<Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>, PlexusError>
289 where
290 G: GanglionInternal + Send + Sync + ?Sized + 'static,
291 {
292 let ganglion_id = {
293 let guard = ganglion.lock().await;
294 guard.unique_id()
295 };
296 self.excise_ganglion_by_id(ganglion_id).await
297 }
298
299 pub async fn excise_external_ganglion_by_id(
301 &mut self,
302 ganglion_id: Uuid,
303 ) -> Result<Option<Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>, PlexusError> {
304 let removed = self.external_ganglia.write().await.remove(&ganglion_id);
305
306 if removed.is_some() {
307 self.update_neuron_ganglia().await?;
308 }
309
310 Ok(removed)
311 }
312
313 pub async fn excise_external_ganglion<G>(
315 &mut self,
316 ganglion: Arc<Mutex<G>>,
317 ) -> Result<Option<Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>, PlexusError>
318 where
319 G: GanglionExternal + Send + Sync + ?Sized + 'static,
320 {
321 let ganglion_id = {
322 let guard = ganglion.lock().await;
323 guard.unique_id()
324 };
325 self.excise_external_ganglion_by_id(ganglion_id).await
326 }
327
328 pub async fn update_neurons(
330 &self,
331 neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
332 ) -> Result<(), PlexusError> {
333 let mut neurons = self.neurons.write().await;
334 neurons.insert(neuron.name(), neuron);
335 drop(neurons);
336
337 self.update_neuron_ganglia().await?;
339 Ok(())
340 }
341
342 async fn update_neuron_ganglia(&self) -> Result<(), PlexusError> {
344 update_neuron_ganglia_internal(
345 &self.neurons,
346 &self.internal_ganglia,
347 &self.external_ganglia,
348 &self.neuron_ganglia,
349 &self.reactant_factories,
350 &self.reactions,
351 )
352 .await
353 }
354}
355
356impl Ganglion for Plexus {
357 fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
358 where
359 C: Codec<T> + CodecName + Send + Sync + 'static,
360 T: Send + Sync + 'static,
361 {
362 let neuron_name = neuron.name();
363
364 if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
366 return false;
367 }
368
369 if self.ignored_neurons.contains(&neuron_name) {
371 return false;
372 }
373
374 true
375 }
376
377 fn adapt<T, C>(
378 &mut self,
379
380 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
381 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
382 where
383 C: Codec<T> + CodecName + Send + Sync + 'static,
384 T: Send + Sync + 'static,
385 {
386 tracing::debug!(neuron = %neuron.name(), "Plexus::adapt - Adapting neuron");
387
388 if !self.capable(neuron.clone()) {
391 return Box::pin(async move { Ok(()) });
392 }
393
394 let inproc_ganglion = self.inproc_ganglion.clone();
395
396 let erased_neuron = erase_neuron(neuron.clone());
397
398 let reactions = self.reactions.clone();
399
400 let reactant_factories = self.reactant_factories.clone();
401
402 let neurons = self.neurons.clone();
403
404 let id = self.id;
405
406 let internal_ganglia = self.internal_ganglia.clone();
407
408 let external_ganglia = self.external_ganglia.clone();
409
410 let neuron_ganglia = self.neuron_ganglia.clone();
411
412 Box::pin(async move {
415 let future = {
416 let mut inproc_ganglion_guard = inproc_ganglion.lock().await;
417
418 inproc_ganglion_guard.adapt(neuron.clone())
419 };
420
421 let result = future.await;
422
423 let factories = PlexusReactantFactories {
424 internal_factory: Arc::new(PlexusInternalReactantFactory::<T, C>::new()),
425
426 external_internal_factory: Arc::new(
427 PlexusExternalInternalReactantFactory::<T, C>::new(),
428 ),
429
430 external_external_factory: Arc::new(
431 PlexusExternalExternalReactantFactory::<T, C>::new(),
432 ),
433 };
434
435 reactant_factories
436 .write()
437 .await
438 .insert(neuron.name(), factories);
439
440 if let Err(plexus_error) = update_neurons_internal(
443 &neurons,
444 &internal_ganglia,
445 &external_ganglia,
446 &neuron_ganglia,
447 &reactant_factories,
448 &reactions,
449 erased_neuron,
450 )
451 .await
452 {
453 return Err(GanglionError::from_plexus_error(
456 plexus_error,
457 neuron.name(),
458 "Plexus".to_string(),
459 id,
460 ));
461 }
462
463 result
464 })
465 }
466}
467
468#[allow(clippy::type_complexity)]
469async fn update_neurons_internal(
470 neurons_map: &Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
471
472 internal_ganglia: &Arc<
473 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
474 >,
475
476 external_ganglia: &Arc<
477 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
478 >,
479
480 neuron_ganglia: &Arc<RwLock<HashSet<(String, Uuid)>>>,
481
482 reactant_factories: &Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
483
484 reactions: &Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
485
486 neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
487) -> Result<(), PlexusError> {
488 let mut neurons = neurons_map.write().await;
489
490 neurons.insert(neuron.name(), neuron);
491
492 drop(neurons);
493
494 update_neuron_ganglia_internal(
497 neurons_map,
498 internal_ganglia,
499 external_ganglia,
500 neuron_ganglia,
501 reactant_factories,
502 reactions,
503 )
504 .await?;
505
506 Ok(())
507}
508
509#[allow(clippy::type_complexity)]
510async fn update_neuron_ganglia_internal(
511 neurons_map: &Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
512
513 internal_ganglia_map: &Arc<
514 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
515 >,
516
517 external_ganglia_map: &Arc<
518 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
519 >,
520
521 neuron_ganglia_set: &Arc<RwLock<HashSet<(String, Uuid)>>>,
522
523 reactant_factories_map: &Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
524
525 reactions: &Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
526) -> Result<(), PlexusError> {
527 let neurons = neurons_map.read().await;
528
529 let internal_ganglia = internal_ganglia_map.read().await;
530
531 let external_ganglia = external_ganglia_map.read().await;
532
533 let all_internal_neuron_ganglia: HashSet<(String, Uuid)> = neurons
536 .iter()
537 .cartesian_product(internal_ganglia.iter())
538 .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
539 (neuron_name.clone(), *ganglion_id)
540 })
541 .collect();
542
543 let all_external_neuron_ganglia: HashSet<(String, Uuid)> = neurons
546 .iter()
547 .cartesian_product(external_ganglia.iter())
548 .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
549 (neuron_name.clone(), *ganglion_id)
550 })
551 .collect();
552
553 let mut neuron_ganglia = neuron_ganglia_set.write().await;
554
555 let new_internal_combinations: Vec<(String, Uuid)> = all_internal_neuron_ganglia
556 .difference(&neuron_ganglia)
557 .cloned()
558 .collect();
559
560 let new_external_combinations: Vec<(String, Uuid)> = all_external_neuron_ganglia
561 .difference(&neuron_ganglia)
562 .cloned()
563 .collect();
564
565 *neuron_ganglia = all_internal_neuron_ganglia
566 .union(&all_external_neuron_ganglia)
567 .cloned()
568 .collect();
569
570 drop(neuron_ganglia);
571
572 let mut internal_reactants_by_ganglion: HashMap<
575 (String, Uuid),
576 Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
577 > = HashMap::new();
578
579 for (neuron_name, ganglion_id) in new_internal_combinations {
580 let internal_reactant = reactant_factories_map
581 .read()
582 .await
583 .get(&neuron_name)
584 .ok_or_else(|| PlexusError::ReactantCreation {
585 neuron_name: neuron_name.clone(),
586 })?
587 .internal_factory
588 .create_reactant(
589 ganglion_id,
590 internal_ganglia_map.clone(),
591 external_ganglia_map.clone(),
592 reactions.clone(),
593 );
594
595 internal_reactants_by_ganglion
596 .entry((neuron_name, ganglion_id))
597 .or_default()
598 .push(internal_reactant);
599 }
600
601 for ((neuron_name, ganglion_id), reactants) in internal_reactants_by_ganglion {
604 if let Some(ganglion_mutex) = internal_ganglia.get(&ganglion_id) {
605 let mut ganglion = ganglion_mutex.lock().await;
606
607 ganglion.react(neuron_name, reactants, vec![]).await?;
610 }
611 }
612
613 #[allow(clippy::type_complexity)]
616 let mut external_reactants_by_ganglion: HashMap<
617 (String, Uuid),
618 (
619 Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
620 Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
621 ),
622 > = HashMap::new();
623
624 for (neuron_name, ganglion_id) in new_external_combinations {
625 let external_internal_reactant = reactant_factories_map
626 .read()
627 .await
628 .get(&neuron_name)
629 .ok_or_else(|| PlexusError::ReactantCreation {
630 neuron_name: neuron_name.clone(),
631 })?
632 .external_internal_factory
633 .create_reactant(ganglion_id, internal_ganglia_map.clone(), reactions.clone());
634
635 let external_external_reactant = reactant_factories_map
636 .read()
637 .await
638 .get(&neuron_name)
639 .ok_or_else(|| PlexusError::ReactantCreation {
640 neuron_name: neuron_name.clone(),
641 })?
642 .external_external_factory
643 .create_reactant(ganglion_id, external_ganglia_map.clone(), reactions.clone());
644
645 let entry = external_reactants_by_ganglion
646 .entry((neuron_name.clone(), ganglion_id))
647 .or_default();
648
649 entry.0.push(external_internal_reactant);
650
651 entry.1.push(external_external_reactant);
652 }
653
654 for ((neuron_name, ganglion_id), (reactants, reactants_raw)) in external_reactants_by_ganglion {
657 if let Some(ganglion_mutex) = external_ganglia.get(&ganglion_id) {
658 let mut ganglion = ganglion_mutex.lock().await;
659
660 ganglion
661 .react(neuron_name, reactants, reactants_raw, vec![])
662 .await?;
663 }
664 }
665
666 Ok(())
667}
668
669impl GanglionInternal for Plexus {
670 fn transmit(
671 &mut self,
672
673 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
674 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
675 let inproc_ganglion = self.inproc_ganglion.clone();
676
677 Box::pin(async move {
678 let future = {
679 let mut ganglion = inproc_ganglion.lock().await;
680
681 ganglion.transmit(payload)
682 };
683
684 future.await
685 })
686 }
687
688 fn react(
689 &mut self,
690 neuron_name: String,
691 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
692 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
693 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
694 let inproc_ganglion = self.inproc_ganglion.clone();
695
696 Box::pin(async move {
697 let future = {
698 let mut ganglion = inproc_ganglion.lock().await;
699
700 ganglion.react(neuron_name, reactants, error_reactants)
701 };
702
703 future.await
704 })
705 }
706
707 fn unique_id(&self) -> Uuid {
708 self.id
709 }
710}
711
712#[cfg(test)]
713mod tests {
714 use super::*;
715 use crate::erasure::payload::{erase_payload, erase_payload_raw};
716 use crate::erasure::reactant::{erase_reactant, erase_reactant_raw};
717 use crate::logging::TraceContext;
718 use crate::neuron::NeuronImpl;
719 use crate::payload::{Payload, PayloadRaw};
720 use crate::test_utils::{
721 DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant,
722 TokioMpscReactantRaw, test_namespace,
723 };
724 use tokio::sync::mpsc::channel;
725 use uuid::Uuid;
726
727 #[test]
728 fn test_cartesian_product_equivalence() {
729 let neurons = [("neuron1", "Neuron1"), ("neuron2", "Neuron2")];
731 let ganglia = [("ganglion1", "uuid1"), ("ganglion2", "uuid2")];
732
733 let mut old_combinations = HashSet::new();
735 for (neuron_name, _neuron) in neurons.iter() {
736 for (ganglion_id, _ganglion_mutex) in ganglia.iter() {
737 old_combinations.insert((neuron_name.to_string(), ganglion_id.to_string()));
738 }
739 }
740
741 let new_combinations: HashSet<(String, String)> = neurons
743 .iter()
744 .cartesian_product(ganglia.iter())
745 .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
746 (neuron_name.to_string(), ganglion_id.to_string())
747 })
748 .collect();
749
750 assert_eq!(old_combinations, new_combinations);
752 assert_eq!(old_combinations.len(), 4); }
754
755 #[tokio::test]
756 async fn test_plexus_creation() {
757 let plexus: Plexus = Plexus::new(vec![], vec![]).await;
758
759 assert_eq!(plexus.relevant_neurons.len(), 0);
760 assert_eq!(plexus.ignored_neurons.len(), 0);
761 }
762
763 #[tokio::test]
764 async fn test_plexus_capable() {
765 let ns = test_namespace();
766 let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
767
768 let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
769 let neuron_arc = Arc::new(neuron);
770
771 assert!(plexus.capable(neuron_arc));
772 }
773
774 #[tokio::test]
775 async fn test_plexus_adapt() {
776 let ns = test_namespace();
777 let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
778
779 let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
780
781 plexus
782 .adapt(Arc::new(neuron))
783 .await
784 .expect("Failed to adapt neuron");
785
786 let neurons = plexus.neurons.read().await;
787 assert_eq!(neurons.len(), 1);
788 }
789
790 #[tokio::test]
791 async fn test_plexus_ganglion_internal_adapt() {
792 let ns = test_namespace();
793 let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
794
795 let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
796 let neuron_arc = Arc::new(neuron);
797
798 assert!(plexus.capable(neuron_arc.clone()));
800
801 plexus
803 .adapt(neuron_arc)
804 .await
805 .expect("Failed to adapt neuron");
806
807 let neurons = plexus.neurons.read().await;
808 assert_eq!(neurons.len(), 1);
809 }
810
811 #[tokio::test]
812 async fn test_plexus_external_inprocess_transmit_via_adapt() {
813 let ns = test_namespace();
814
815 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
816 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
817 let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
818 let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
819 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
820 let neuron_arc = neuron_impl.clone_to_arc();
821
822 let mut ganglion_inprocess = GanglionInprocess::new();
824 let mut ganglion_external_inprocess = GanglionExternalInprocess::new();
825
826 let _ = ganglion_inprocess.adapt(neuron_arc.clone()).await;
828 ganglion_external_inprocess
829 .adapt(neuron_arc.clone())
830 .await
831 .expect("Failed to adapt neuron");
832
833 let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
835 erase_reactant(Box::new(TokioMpscReactant::new(tx1))),
836 erase_reactant(Box::new(TokioMpscReactant::new(tx2))),
837 ];
838
839 let erased_raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
840 erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx1))),
841 erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx2))),
842 ];
843
844 ganglion_inprocess
846 .react(neuron_arc.name(), erased_reactants, vec![])
847 .await
848 .expect("Failed to react");
849 ganglion_external_inprocess
850 .react(neuron_arc.name(), vec![], erased_raw_reactants, vec![])
851 .await
852 .expect("Failed to react raw");
853
854 let test_payload = Payload::with_correlation(
856 DebugStruct {
857 foo: 42,
858 bar: "test".to_string(),
859 },
860 neuron_arc.clone(),
861 None,
862 );
863
864 let test_payload_raw = PayloadRaw::with_correlation(
865 DebugCodec::encode(&DebugStruct {
866 foo: 42,
867 bar: "test".to_string(),
868 })
869 .expect("Failed to encode test data"),
870 Some(neuron_arc.clone()),
871 None,
872 );
873
874 let erased_payload = erase_payload(test_payload);
876 ganglion_inprocess
877 .transmit(erased_payload)
878 .await
879 .expect("Failed to transmit");
880
881 let erased_payload_raw = erase_payload_raw(test_payload_raw);
883 ganglion_external_inprocess
884 .transmit_encoded(erased_payload_raw)
885 .await
886 .expect("Failed to transmit encoded");
887
888 let received1 = rx1.recv().await.expect("Failed to receive payload 1");
890 let received2 = rx2.recv().await.expect("Failed to receive payload 2");
891 let raw_received1 = raw_rx1
892 .recv()
893 .await
894 .expect("Failed to receive raw payload 1");
895 let raw_received2 = raw_rx2
896 .recv()
897 .await
898 .expect("Failed to receive raw payload 2");
899
900 assert_eq!(received1.value.foo, 42);
901 assert_eq!(received2.value.foo, 42);
902 let decoded1 =
904 DebugCodec::decode(&raw_received1.value).expect("Failed to decode raw payload 1");
905 let decoded2 =
906 DebugCodec::decode(&raw_received2.value).expect("Failed to decode raw payload 2");
907 assert_eq!(decoded1.foo, 42);
908 assert_eq!(decoded2.foo, 42);
909 }
910
911 #[tokio::test]
912 async fn test_plexus_external_inprocess_across_threads() {
913 let ns = test_namespace();
914
915 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
916 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
917 let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
918 let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
919 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
920 let neuron_arc = neuron_impl.clone_to_arc();
921
922 let ganglion_inprocess = Arc::new(Mutex::new(GanglionInprocess::new()));
924 let ganglion_external_inprocess = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
925
926 {
928 let mut ganglion = ganglion_inprocess.lock().await;
929 let _ = ganglion.adapt(neuron_arc.clone()).await;
930 }
931 {
932 let mut ganglion = ganglion_external_inprocess.lock().await;
933 ganglion
934 .adapt(neuron_arc.clone())
935 .await
936 .expect("Failed to adapt neuron");
937 }
938
939 let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
941 erase_reactant(Box::new(TokioMpscReactant::new(tx1))),
942 erase_reactant(Box::new(TokioMpscReactant::new(tx2))),
943 ];
944
945 let erased_raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
946 erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx1))),
947 erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx2))),
948 ];
949
950 {
952 let mut ganglion = ganglion_inprocess.lock().await;
953 ganglion
954 .react(neuron_arc.name(), erased_reactants, vec![])
955 .await
956 .expect("Failed to react");
957 }
958 {
959 let mut ganglion = ganglion_external_inprocess.lock().await;
960 ganglion
961 .react(neuron_arc.name(), vec![], erased_raw_reactants, vec![])
962 .await
963 .expect("Failed to react raw");
964 }
965
966 let ganglion_inprocess_clone = ganglion_inprocess.clone();
968 let ganglion_external_inprocess_clone = ganglion_external_inprocess.clone();
969
970 let neuron_arc_clone = neuron_arc.clone();
971 let task1 = tokio::task::spawn(async move {
972 let test_payload = Payload::with_correlation(
973 DebugStruct {
974 foo: 42,
975 bar: "test".to_string(),
976 },
977 neuron_arc_clone.clone(),
978 None,
979 );
980
981 let erased_payload = erase_payload(test_payload);
982 let mut ganglion = ganglion_inprocess_clone.lock().await;
983 ganglion
984 .transmit(erased_payload)
985 .await
986 .expect("Failed to transmit");
987 });
988
989 let neuron_arc_clone2 = neuron_arc.clone();
990 let task2 = tokio::task::spawn(async move {
991 let test_payload_raw = PayloadRaw::with_correlation(
992 DebugCodec::encode(&DebugStruct {
993 foo: 42,
994 bar: "test".to_string(),
995 })
996 .expect("Failed to encode test data"),
997 Some(neuron_arc_clone2.clone()),
998 None,
999 );
1000
1001 let erased_payload_raw = erase_payload_raw(test_payload_raw);
1002 let mut ganglion = ganglion_external_inprocess_clone.lock().await;
1003 ganglion
1004 .transmit_encoded(erased_payload_raw)
1005 .await
1006 .expect("Failed to transmit encoded");
1007 });
1008
1009 task1.await.expect("Task 1 failed");
1011 task2.await.expect("Task 2 failed");
1012
1013 let received1 = rx1.recv().await.expect("Failed to receive payload 1");
1015 let received2 = rx2.recv().await.expect("Failed to receive payload 2");
1016 let raw_received1 = raw_rx1
1017 .recv()
1018 .await
1019 .expect("Failed to receive raw payload 1");
1020 let raw_received2 = raw_rx2
1021 .recv()
1022 .await
1023 .expect("Failed to receive raw payload 2");
1024
1025 assert_eq!(received1.value.foo, 42);
1026 assert_eq!(received2.value.foo, 42);
1027 let decoded1 =
1029 DebugCodec::decode(&raw_received1.value).expect("Failed to decode raw payload 1");
1030 let decoded2 =
1031 DebugCodec::decode(&raw_received2.value).expect("Failed to decode raw payload 2");
1032 assert_eq!(decoded1.foo, 42);
1033 assert_eq!(decoded2.foo, 42);
1034 }
1035
1036 #[tokio::test]
1037 async fn test_plexus_transmit_encoded_external_to_internal() {
1038 use crate::erasure::neuron::erase_neuron;
1039 use crate::erasure::payload::erase_payload_raw;
1040 use crate::erasure::reactant::erase_reactant;
1041 use crate::neuron::NeuronImpl;
1042 use crate::payload::PayloadRaw;
1043 use crate::test_utils::{
1044 DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant, test_namespace,
1045 };
1046 use tokio::sync::mpsc::channel;
1047 use tokio::time::{Duration, sleep};
1048
1049 println!("[DEBUG] TEST - Starting test_plexus_transmit_encoded_external_to_internal");
1050
1051 let ns = test_namespace();
1052 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1053 let neuron_arc = neuron_impl.clone_to_arc();
1054
1055 let (tx_internal, mut rx_internal) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1057
1058 let external_ganglion = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1060 let mut internal_plexus = Plexus::new(vec![erase_neuron(neuron_arc.clone())], vec![]).await;
1061
1062 {
1064 let mut g = external_ganglion.lock().await;
1065 g.adapt(neuron_arc.clone())
1066 .await
1067 .expect("Failed to adapt neuron to external ganglion");
1068 }
1069 internal_plexus
1070 .adapt(neuron_arc.clone())
1071 .await
1072 .expect("Failed to adapt neuron to internal plexus");
1073
1074 let reactants = vec![erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(
1076 TokioMpscReactant::new(tx_internal),
1077 ))];
1078 internal_plexus
1079 .react(neuron_arc.name(), reactants, vec![])
1080 .await
1081 .expect("Failed to add reactant to internal plexus");
1082
1083 let _ = internal_plexus
1085 .infuse_external_ganglion(external_ganglion.clone())
1086 .await;
1087
1088 let test_data = DebugStruct {
1090 foo: 123,
1091 bar: "plexus_transmit_encoded_test".to_string(),
1092 };
1093 let encoded_data = neuron_impl
1094 .encode(&test_data)
1095 .expect("Failed to encode test data");
1096 let correlation_id = Uuid::now_v7();
1097
1098 let span_id = Uuid::now_v7().as_u128() as u64;
1100 let payload_raw = Arc::new(PayloadRaw {
1101 value: Arc::new(encoded_data),
1102 neuron: Some(neuron_arc.clone()),
1103 trace: TraceContext::from_parts(correlation_id, span_id, None),
1104 });
1105
1106 println!("[DEBUG] TEST - About to call transmit_encoded on external ganglion");
1108 {
1109 let mut g = external_ganglion.lock().await;
1110 let erased_payload_raw = erase_payload_raw(payload_raw.clone());
1111 g.transmit_encoded(erased_payload_raw)
1112 .await
1113 .expect("Failed to transmit encoded payload");
1114 }
1115 println!("[DEBUG] TEST - Completed transmit_encoded call");
1116
1117 sleep(Duration::from_millis(100)).await;
1119
1120 assert_eq!(
1122 rx_internal.len(),
1123 1,
1124 "Internal plexus should have received exactly one payload"
1125 );
1126 let received_payload = rx_internal
1127 .recv()
1128 .await
1129 .expect("Should receive decoded payload in internal plexus");
1130
1131 assert_eq!(
1133 received_payload.value.foo, test_data.foo,
1134 "Decoded payload should have correct foo value"
1135 );
1136 assert_eq!(
1137 received_payload.value.bar, test_data.bar,
1138 "Decoded payload should have correct bar value"
1139 );
1140 assert_eq!(
1141 received_payload.correlation_id(), correlation_id,
1142 "Correlation ID should match"
1143 );
1144
1145 println!(
1146 "[DEBUG] TEST - Successfully verified that PayloadRaw from transmit_encoded was received as decoded Payload in Plexus"
1147 );
1148 }
1149
1150 #[tokio::test]
1151 async fn test_plexus_ganglion_excision() {
1152 let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
1153
1154 let internal_ganglion_1 = Arc::new(Mutex::new(GanglionInprocess::new()));
1156 let internal_id_1 = { internal_ganglion_1.lock().await.unique_id() };
1157
1158 plexus
1159 .infuse_ganglion(internal_ganglion_1.clone())
1160 .await
1161 .expect("Failed to infuse internal ganglion 1");
1162
1163 {
1164 let internal_ganglia = plexus.internal_ganglia.read().await;
1165 assert!(internal_ganglia.contains_key(&internal_id_1));
1166 }
1167
1168 let excised_internal_1 = plexus
1169 .excise_ganglion_by_id(internal_id_1)
1170 .await
1171 .expect("Failed to excise internal ganglion 1 by ID");
1172 assert!(excised_internal_1.is_some());
1173 assert_eq!(
1174 excised_internal_1.unwrap().lock().await.unique_id(),
1175 internal_id_1
1176 );
1177
1178 {
1179 let internal_ganglia = plexus.internal_ganglia.read().await;
1180 assert!(!internal_ganglia.contains_key(&internal_id_1));
1181 }
1182
1183 let internal_ganglion_2 = Arc::new(Mutex::new(GanglionInprocess::new()));
1185 let internal_id_2 = { internal_ganglion_2.lock().await.unique_id() };
1186
1187 plexus
1188 .infuse_ganglion(internal_ganglion_2.clone())
1189 .await
1190 .expect("Failed to infuse internal ganglion 2");
1191
1192 let excised_internal_2 = plexus
1193 .excise_ganglion(internal_ganglion_2.clone())
1194 .await
1195 .expect("Failed to excise internal ganglion 2 by instance");
1196 assert!(excised_internal_2.is_some());
1197 assert_eq!(
1198 excised_internal_2.unwrap().lock().await.unique_id(),
1199 internal_id_2
1200 );
1201
1202 {
1203 let internal_ganglia = plexus.internal_ganglia.read().await;
1204 assert!(!internal_ganglia.contains_key(&internal_id_2));
1205 }
1206
1207 let external_ganglion_1 = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1209 let external_id_1 = { external_ganglion_1.lock().await.unique_id() };
1210
1211 plexus
1212 .infuse_external_ganglion(external_ganglion_1.clone())
1213 .await
1214 .expect("Failed to infuse external ganglion 1");
1215
1216 {
1217 let external_ganglia = plexus.external_ganglia.read().await;
1218 assert!(external_ganglia.contains_key(&external_id_1));
1219 }
1220
1221 let excised_external_1 = plexus
1222 .excise_external_ganglion_by_id(external_id_1)
1223 .await
1224 .expect("Failed to excise external ganglion 1 by ID");
1225 assert!(excised_external_1.is_some());
1226 assert_eq!(
1227 excised_external_1.unwrap().lock().await.unique_id(),
1228 external_id_1
1229 );
1230
1231 {
1232 let external_ganglia = plexus.external_ganglia.read().await;
1233 assert!(!external_ganglia.contains_key(&external_id_1));
1234 }
1235
1236 let external_ganglion_2 = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1238 let external_id_2 = { external_ganglion_2.lock().await.unique_id() };
1239
1240 plexus
1241 .infuse_external_ganglion(external_ganglion_2.clone())
1242 .await
1243 .expect("Failed to infuse external ganglion 2");
1244
1245 let excised_external_2 = plexus
1246 .excise_external_ganglion(external_ganglion_2.clone())
1247 .await
1248 .expect("Failed to excise external ganglion 2 by instance");
1249 assert!(excised_external_2.is_some());
1250 assert_eq!(
1251 excised_external_2.unwrap().lock().await.unique_id(),
1252 external_id_2
1253 );
1254
1255 {
1256 let external_ganglia = plexus.external_ganglia.read().await;
1257 assert!(!external_ganglia.contains_key(&external_id_2));
1258 }
1259 }
1260
1261 #[tokio::test]
1262 async fn test_weak_key_hash_map_reaction_cleanup() {
1263 }
1265}