1use crate::axon::AxonError;
8use crate::codec::{Codec, CodecName};
9use crate::erasure::neuron::{NeuronErased, erase_neuron};
10use crate::erasure::payload::{PayloadErased, erase_payload, erase_payload_raw};
11use crate::erasure::reactant::{
12 ErrorReactantErased, ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
13};
14use crate::ganglion::{
15 Ganglion, GanglionError, GanglionExternal, GanglionInprocess, GanglionInternal,
16};
17use crate::logging::LogTrace;
18use crate::neuron::Neuron;
19use crate::payload::{Payload, PayloadRaw};
20use crate::reactant::{Reactant, ReactantRaw};
21use futures_util::future::join_all;
22use itertools::Itertools;
23use moka::future::Cache;
24use std::collections::{HashMap, HashSet};
25use std::future::Future;
26use std::marker::PhantomData;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::time::Duration;
30use thiserror::Error;
31use tokio::sync::{Mutex, RwLock};
32use tracing::{Instrument};
33use uuid::Uuid;
34
35#[derive(Error, Debug)]
36pub enum PlexusError {
37 #[error("Ganglion error: {0}")]
38 Ganglion(#[from] GanglionError),
39 #[error("Failed to acquire lock on external ganglia")]
40 ExternalGangliaLock,
41 #[error("Failed to acquire lock on internal ganglia")]
42 InternalGangliaLock,
43 #[error("Failed to acquire lock on neurons")]
44 NeuronsLock,
45 #[error("Failed to acquire lock on reactant factories")]
46 ReactantFactoriesLock,
47 #[error("Failed to acquire lock on neuron ganglia")]
48 NeuronGangliaLock,
49 #[error("Failed to acquire lock on reactions")]
50 ReactionsLock,
51 #[error("Neuron adaptation failed for {neuron_name}")]
52 NeuronAdaptation { neuron_name: String },
53 #[error("Reactant creation failed for {neuron_name}")]
54 ReactantCreation { neuron_name: String },
55 #[error("Transmission failed")]
56 Transmission,
57}
58
59impl From<AxonError> for PlexusError {
60 fn from(error: AxonError) -> Self {
61 match error {
62 AxonError::GanglionError(e) => PlexusError::Ganglion(e),
63 AxonError::NeuronNotAdapted {
64 neuron_name,
65 ganglion_name,
66 ganglion_id,
67 } => PlexusError::Ganglion(GanglionError::SynapseNotFound {
68 neuron_name,
69 ganglion_name,
70 ganglion_id,
71 }),
72 AxonError::SynapseLock {
73 neuron_name,
74 ganglion_name,
75 ganglion_id,
76 } => PlexusError::Ganglion(GanglionError::SynapseLock {
77 neuron_name,
78 ganglion_name,
79 ganglion_id,
80 }),
81 AxonError::Transmit {
82 neuron_name,
83 ganglion_name,
84 ganglion_id,
85 message,
86 } => PlexusError::Ganglion(GanglionError::Transmit {
87 neuron_name,
88 ganglion_name,
89 ganglion_id,
90 message,
91 }),
92 AxonError::Encode {
93 neuron_name,
94 ganglion_name,
95 ganglion_id,
96 } => PlexusError::Ganglion(GanglionError::Encode {
97 neuron_name,
98 ganglion_name,
99 ganglion_id,
100 }),
101 AxonError::Decode {
102 neuron_name,
103 ganglion_name,
104 ganglion_id,
105 } => PlexusError::Ganglion(GanglionError::Decode {
106 neuron_name,
107 ganglion_name,
108 ganglion_id,
109 }),
110 AxonError::TransmissionTimeout => PlexusError::Transmission,
111 }
112 }
113}
114
115pub struct PlexusReactantFactories {
116 pub internal_factory: Arc<dyn ErasedInternalReactantFactory + Send + Sync>,
117 pub external_internal_factory: Arc<dyn ErasedExternalInternalReactantFactory + Send + Sync>,
118 pub external_external_factory: Arc<dyn ErasedExternalExternalReactantFactory + Send + Sync>,
119}
120
121#[allow(clippy::type_complexity)]
123pub struct Plexus {
124 id: Uuid,
126
127 inproc_ganglion: Arc<Mutex<GanglionInprocess>>,
129
130 external_ganglia:
132 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
133
134 internal_ganglia:
136 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
137
138 neurons: Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
140
141 reactant_factories: Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
143
144 neuron_ganglia: Arc<RwLock<HashSet<(String, Uuid)>>>,
146
147 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
150
151 relevant_neurons: HashSet<String>,
153
154 ignored_neurons: HashSet<String>,
156}
157
158impl Plexus {
159 pub async fn neurons(
161 &self,
162 ) -> tokio::sync::RwLockReadGuard<
163 '_,
164 HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>,
165 > {
166 self.neurons.read().await
167 }
168
169 pub async fn new(
171 relevant_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
172 ignored_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
173 ) -> Self {
174 let inproc_ganglion = Arc::new(Mutex::new(GanglionInprocess::new()));
175
176 let relevant_neuron_names = relevant_neurons
177 .iter()
178 .map(|n| n.name())
179 .collect::<HashSet<String>>();
180
181 let ignored_neuron_names = ignored_neurons
182 .iter()
183 .map(|n| n.name())
184 .collect::<HashSet<String>>();
185
186 let reactions = Cache::builder()
187 .time_to_idle(Duration::from_secs(60)) .build();
189
190 let plexus = Self {
191 id: Uuid::now_v7(),
192 inproc_ganglion: inproc_ganglion.clone(),
193 external_ganglia: Arc::new(RwLock::new(HashMap::new())),
194 internal_ganglia: Arc::new(RwLock::new(HashMap::new())),
195 neurons: Arc::new(RwLock::new(HashMap::new())),
196 reactant_factories: Arc::new(RwLock::new(HashMap::new())),
197 neuron_ganglia: Arc::new(RwLock::new(HashSet::new())),
198 reactions,
199 relevant_neurons: relevant_neuron_names,
200 ignored_neurons: ignored_neuron_names,
201 };
202
203 let ganglion_id = {
204 let ganglion_guard = inproc_ganglion.lock().await;
205 ganglion_guard.unique_id()
206 };
207 plexus
208 .internal_ganglia
209 .write()
210 .await
211 .insert(ganglion_id, inproc_ganglion);
212
213 plexus
214 }
215
216 pub async fn new_shared(
218 relevant_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
219 ignored_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
220 ) -> Arc<Mutex<Self>> {
221 Arc::new(Mutex::new(
222 Self::new(relevant_neurons, ignored_neurons).await,
223 ))
224 }
225
226 pub async fn infuse_ganglion<G>(&mut self, ganglion: Arc<Mutex<G>>) -> Result<(), PlexusError>
228 where
229 G: GanglionInternal + Ganglion + Send + Sync + 'static,
230 {
231 let ganglion_id = {
232 let ganglion_guard = ganglion.lock().await;
233 ganglion_guard.unique_id()
234 };
235
236 self.internal_ganglia
237 .write()
238 .await
239 .insert(ganglion_id, ganglion.clone());
240
241 self.update_neuron_ganglia().await?;
242 Ok(())
243 }
244
245 pub async fn infuse_external_ganglion<G>(
247 &mut self,
248 ganglion: Arc<Mutex<G>>,
249 ) -> Result<(), PlexusError>
250 where
251 G: GanglionExternal + Send + Sync + 'static,
252 {
253 let ganglion_id = {
254 let ganglion_guard = ganglion.lock().await;
255 ganglion_guard.unique_id()
256 };
257
258 self.external_ganglia
259 .write()
260 .await
261 .insert(ganglion_id, ganglion);
262
263 self.update_neuron_ganglia().await?;
264 Ok(())
265 }
266
267 pub async fn excise_ganglion_by_id(
269 &mut self,
270 ganglion_id: Uuid,
271 ) -> Result<Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>, PlexusError> {
272 let removed = self.internal_ganglia.write().await.remove(&ganglion_id);
273
274 if removed.is_some() {
275 self.update_neuron_ganglia().await?;
276 }
277
278 Ok(removed)
279 }
280
281 pub async fn excise_ganglion<G>(
283 &mut self,
284 ganglion: Arc<Mutex<G>>,
285 ) -> Result<Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>, PlexusError>
286 where
287 G: GanglionInternal + Send + Sync + ?Sized + 'static,
288 {
289 let ganglion_id = {
290 let guard = ganglion.lock().await;
291 guard.unique_id()
292 };
293 self.excise_ganglion_by_id(ganglion_id).await
294 }
295
296 pub async fn excise_external_ganglion_by_id(
298 &mut self,
299 ganglion_id: Uuid,
300 ) -> Result<Option<Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>, PlexusError> {
301 let removed = self.external_ganglia.write().await.remove(&ganglion_id);
302
303 if removed.is_some() {
304 self.update_neuron_ganglia().await?;
305 }
306
307 Ok(removed)
308 }
309
310 pub async fn excise_external_ganglion<G>(
312 &mut self,
313 ganglion: Arc<Mutex<G>>,
314 ) -> Result<Option<Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>, PlexusError>
315 where
316 G: GanglionExternal + Send + Sync + ?Sized + 'static,
317 {
318 let ganglion_id = {
319 let guard = ganglion.lock().await;
320 guard.unique_id()
321 };
322 self.excise_external_ganglion_by_id(ganglion_id).await
323 }
324
325 pub async fn update_neurons(
327 &self,
328 neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
329 ) -> Result<(), PlexusError> {
330 let mut neurons = self.neurons.write().await;
331 neurons.insert(neuron.name(), neuron);
332 drop(neurons);
333
334 self.update_neuron_ganglia().await?;
335 Ok(())
336 }
337
338 async fn update_neuron_ganglia(&self) -> Result<(), PlexusError> {
340 update_neuron_ganglia_internal(
341 &self.neurons,
342 &self.internal_ganglia,
343 &self.external_ganglia,
344 &self.neuron_ganglia,
345 &self.reactant_factories,
346 &self.reactions,
347 )
348 .await
349 }
350}
351
352impl Ganglion for Plexus {
353 fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
354 where
355 C: Codec<T> + CodecName + Send + Sync + 'static,
356 T: Send + Sync + 'static,
357 {
358 let neuron_name = neuron.name();
359
360 if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
361 return false;
362 }
363
364 if self.ignored_neurons.contains(&neuron_name) {
365 return false;
366 }
367
368 true
369 }
370
371 fn adapt<T, C>(
372 &mut self,
373 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
374 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
375 where
376 C: Codec<T> + CodecName + Send + Sync + 'static,
377 T: Send + Sync + 'static,
378 {
379 tracing::debug!(neuron = %neuron.name(), "Plexus::adapt - Adapting neuron");
380
381 if !self.capable(neuron.clone()) {
382 return Box::pin(async move { Ok(()) });
383 }
384
385 let inproc_ganglion = self.inproc_ganglion.clone();
386 let erased_neuron = erase_neuron(neuron.clone());
387 let reactions = self.reactions.clone();
388 let reactant_factories = self.reactant_factories.clone();
389 let neurons = self.neurons.clone();
390 let id = self.id;
391 let internal_ganglia = self.internal_ganglia.clone();
392 let external_ganglia = self.external_ganglia.clone();
393 let neuron_ganglia = self.neuron_ganglia.clone();
394
395 Box::pin(async move {
396 let future = {
397 let mut inproc_ganglion_guard = inproc_ganglion.lock().await;
398 inproc_ganglion_guard.adapt(neuron.clone())
399 };
400
401 let result = future.await;
402
403 let factories = PlexusReactantFactories {
404 internal_factory: Arc::new(PlexusInternalReactantFactory::<T, C>::new()),
405 external_internal_factory: Arc::new(
406 PlexusExternalInternalReactantFactory::<T, C>::new(),
407 ),
408 external_external_factory: Arc::new(
409 PlexusExternalExternalReactantFactory::<T, C>::new(),
410 ),
411 };
412
413 reactant_factories
414 .write()
415 .await
416 .insert(neuron.name(), factories);
417
418 if let Err(plexus_error) = update_neurons_internal(
419 &neurons,
420 &internal_ganglia,
421 &external_ganglia,
422 &neuron_ganglia,
423 &reactant_factories,
424 &reactions,
425 erased_neuron,
426 )
427 .await
428 {
429 return Err(GanglionError::from_plexus_error(
430 plexus_error,
431 neuron.name(),
432 "Plexus".to_string(),
433 id,
434 ));
435 }
436
437 result
438 })
439 }
440}
441
442#[allow(clippy::type_complexity)]
443async fn update_neurons_internal(
444 neurons_map: &Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
445 internal_ganglia: &Arc<
446 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
447 >,
448 external_ganglia: &Arc<
449 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
450 >,
451 neuron_ganglia: &Arc<RwLock<HashSet<(String, Uuid)>>>,
452 reactant_factories: &Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
453 reactions: &Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
454 neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
455) -> Result<(), PlexusError> {
456 let mut neurons = neurons_map.write().await;
457 neurons.insert(neuron.name(), neuron);
458 drop(neurons);
459
460 update_neuron_ganglia_internal(
461 neurons_map,
462 internal_ganglia,
463 external_ganglia,
464 neuron_ganglia,
465 reactant_factories,
466 reactions,
467 )
468 .await?;
469
470 Ok(())
471}
472
473#[allow(clippy::type_complexity)]
474async fn update_neuron_ganglia_internal(
475 neurons_map: &Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
476 internal_ganglia_map: &Arc<
477 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
478 >,
479 external_ganglia_map: &Arc<
480 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
481 >,
482 neuron_ganglia_set: &Arc<RwLock<HashSet<(String, Uuid)>>>,
483 reactant_factories_map: &Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
484 reactions: &Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
485) -> Result<(), PlexusError> {
486 let neurons = neurons_map.read().await;
487 let internal_ganglia = internal_ganglia_map.read().await;
488 let external_ganglia = external_ganglia_map.read().await;
489
490 let all_internal_neuron_ganglia: HashSet<(String, Uuid)> = neurons
491 .iter()
492 .cartesian_product(internal_ganglia.iter())
493 .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
494 (neuron_name.clone(), *ganglion_id)
495 })
496 .collect();
497
498 let all_external_neuron_ganglia: HashSet<(String, Uuid)> = neurons
499 .iter()
500 .cartesian_product(external_ganglia.iter())
501 .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
502 (neuron_name.clone(), *ganglion_id)
503 })
504 .collect();
505
506 let mut neuron_ganglia = neuron_ganglia_set.write().await;
507 let new_internal_combinations: Vec<(String, Uuid)> = all_internal_neuron_ganglia
508 .difference(&neuron_ganglia)
509 .cloned()
510 .collect();
511
512 let new_external_combinations: Vec<(String, Uuid)> = all_external_neuron_ganglia
513 .difference(&neuron_ganglia)
514 .cloned()
515 .collect();
516
517 *neuron_ganglia = all_internal_neuron_ganglia
518 .union(&all_external_neuron_ganglia)
519 .cloned()
520 .collect();
521
522 drop(neuron_ganglia);
523
524 let mut internal_reactants_by_ganglion: HashMap<
525 (String, Uuid),
526 Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
527 > = HashMap::new();
528
529 for (neuron_name, ganglion_id) in new_internal_combinations {
530 let internal_reactant = reactant_factories_map
531 .read()
532 .await
533 .get(&neuron_name)
534 .ok_or_else(|| PlexusError::ReactantCreation {
535 neuron_name: neuron_name.clone(),
536 })?
537 .internal_factory
538 .create_reactant(
539 ganglion_id,
540 internal_ganglia_map.clone(),
541 external_ganglia_map.clone(),
542 reactions.clone(),
543 );
544
545 internal_reactants_by_ganglion
546 .entry((neuron_name, ganglion_id))
547 .or_default()
548 .push(internal_reactant);
549 }
550
551 for ((neuron_name, ganglion_id), reactants) in internal_reactants_by_ganglion {
552 if let Some(ganglion_mutex) = internal_ganglia.get(&ganglion_id) {
553 let mut ganglion = ganglion_mutex.lock().await;
554 ganglion.react(neuron_name, reactants, vec![]).await?;
555 }
556 }
557
558 #[allow(clippy::type_complexity)]
559 let mut external_reactants_by_ganglion: HashMap<
560 (String, Uuid),
561 (
562 Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
563 Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
564 ),
565 > = HashMap::new();
566
567 for (neuron_name, ganglion_id) in new_external_combinations {
568 let external_internal_reactant = reactant_factories_map
569 .read()
570 .await
571 .get(&neuron_name)
572 .ok_or_else(|| PlexusError::ReactantCreation {
573 neuron_name: neuron_name.clone(),
574 })?
575 .external_internal_factory
576 .create_reactant(ganglion_id, internal_ganglia_map.clone(), reactions.clone());
577
578 let external_external_reactant = reactant_factories_map
579 .read()
580 .await
581 .get(&neuron_name)
582 .ok_or_else(|| PlexusError::ReactantCreation {
583 neuron_name: neuron_name.clone(),
584 })?
585 .external_external_factory
586 .create_reactant(ganglion_id, external_ganglia_map.clone(), reactions.clone());
587
588 let entry = external_reactants_by_ganglion
589 .entry((neuron_name.clone(), ganglion_id))
590 .or_default();
591 entry.0.push(external_internal_reactant);
592 entry.1.push(external_external_reactant);
593 }
594
595 for ((neuron_name, ganglion_id), (reactants, reactants_raw)) in external_reactants_by_ganglion {
596 if let Some(ganglion_mutex) = external_ganglia.get(&ganglion_id) {
597 let mut ganglion = ganglion_mutex.lock().await;
598 ganglion
599 .react(neuron_name, reactants, reactants_raw, vec![])
600 .await?;
601 }
602 }
603
604 Ok(())
605}
606
607impl GanglionInternal for Plexus {
608 fn transmit(
609 &mut self,
610 payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
611 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
612 let inproc_ganglion = self.inproc_ganglion.clone();
613
614 Box::pin(async move {
615 let future = {
616 let mut ganglion = inproc_ganglion.lock().await;
617 ganglion.transmit(payload)
618 };
619
620 future.await
621 })
622 }
623
624 fn react(
625 &mut self,
626 neuron_name: String,
627 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
628 error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
629 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
630 let inproc_ganglion = self.inproc_ganglion.clone();
631
632 Box::pin(async move {
633 let future = {
634 let mut ganglion = inproc_ganglion.lock().await;
635 ganglion.react(neuron_name, reactants, error_reactants)
636 };
637
638 future.await
639 })
640 }
641
642 fn react_many(
643 &mut self,
644 reactions: HashMap<
645 String,
646 (
647 Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
648 Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
649 ),
650 >,
651 ) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
652 let inproc_ganglion = self.inproc_ganglion.clone();
653
654 Box::pin(async move {
655 let mut ganglion = inproc_ganglion.lock().await;
656 ganglion.react_many(reactions).await
657 })
658 }
659
660 fn unique_id(&self) -> Uuid {
661 self.id
662 }
663}
664
665pub trait ErasedInternalReactantFactory: Send + Sync + 'static {
668 #[allow(clippy::type_complexity)]
669 fn create_reactant(
670 &self,
671 current_ganglion_id: Uuid,
672 internal_ganglia: Arc<
673 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
674 >,
675 external_ganglia: Arc<
676 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
677 >,
678 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
679 ) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
680}
681
682pub trait ErasedExternalInternalReactantFactory: Send + Sync + 'static {
683 #[allow(clippy::type_complexity)]
684 fn create_reactant(
685 &self,
686 current_ganglion_id: Uuid,
687 internal_ganglia: Arc<
688 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
689 >,
690 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
691 ) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
692}
693
694pub trait ErasedExternalExternalReactantFactory: Send + Sync + 'static {
695 #[allow(clippy::type_complexity)]
696 fn create_reactant(
697 &self,
698 current_ganglion_id: Uuid,
699 external_ganglia: Arc<
700 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
701 >,
702 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
703 ) -> Arc<dyn ReactantRawErased + Send + Sync + 'static>;
704}
705
706pub struct PlexusInternalReactantFactory<T, C> {
707 _phantom: PhantomData<(T, C)>,
708}
709
710impl<T, C> PlexusInternalReactantFactory<T, C> {
711 pub fn new() -> Self {
712 Self {
713 _phantom: PhantomData,
714 }
715 }
716}
717
718impl<T, C> Default for PlexusInternalReactantFactory<T, C> {
719 fn default() -> Self {
720 Self::new()
721 }
722}
723
724impl<T, C> ErasedInternalReactantFactory for PlexusInternalReactantFactory<T, C>
725where
726 C: Codec<T> + CodecName + Send + Sync + 'static,
727 T: Send + Sync + 'static,
728{
729 fn create_reactant(
730 &self,
731 current_ganglion_id: Uuid,
732 internal_ganglia: Arc<
733 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
734 >,
735 external_ganglia: Arc<
736 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
737 >,
738 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
739 ) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
740 erase_reactant(Box::new(PlexusInternalReactant::<T, C>::new(
741 current_ganglion_id,
742 internal_ganglia,
743 external_ganglia,
744 reactions,
745 )))
746 }
747}
748
749pub struct PlexusExternalInternalReactantFactory<T, C> {
750 _phantom: PhantomData<(T, C)>,
751}
752
753impl<T, C> PlexusExternalInternalReactantFactory<T, C> {
754 pub fn new() -> Self {
755 Self {
756 _phantom: PhantomData,
757 }
758 }
759}
760
761impl<T, C> Default for PlexusExternalInternalReactantFactory<T, C> {
762 fn default() -> Self {
763 Self::new()
764 }
765}
766
767impl<T, C> ErasedExternalInternalReactantFactory for PlexusExternalInternalReactantFactory<T, C>
768where
769 C: Codec<T> + CodecName + Send + Sync + 'static,
770 T: Send + Sync + 'static,
771{
772 fn create_reactant(
773 &self,
774 current_ganglion_id: Uuid,
775 internal_ganglia: Arc<
776 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
777 >,
778 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
779 ) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
780 erase_reactant(Box::new(PlexusExternalInternalReactant::<T, C>::new(
781 current_ganglion_id,
782 internal_ganglia,
783 reactions,
784 )))
785 }
786}
787
788pub struct PlexusExternalExternalReactantFactory<T, C> {
789 _phantom: PhantomData<(T, C)>,
790}
791
792impl<T, C> PlexusExternalExternalReactantFactory<T, C> {
793 pub fn new() -> Self {
794 Self {
795 _phantom: PhantomData,
796 }
797 }
798}
799
800impl<T, C> Default for PlexusExternalExternalReactantFactory<T, C> {
801 fn default() -> Self {
802 Self::new()
803 }
804}
805
806impl<T, C> ErasedExternalExternalReactantFactory for PlexusExternalExternalReactantFactory<T, C>
807where
808 C: Codec<T> + CodecName + Send + Sync + 'static,
809 T: Send + Sync + 'static,
810{
811 fn create_reactant(
812 &self,
813 current_ganglion_id: Uuid,
814 external_ganglia: Arc<
815 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
816 >,
817 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
818 ) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
819 erase_reactant_raw(Box::new(PlexusExternalExternalReactant::<T, C>::new(
820 current_ganglion_id,
821 external_ganglia,
822 reactions,
823 )))
824 }
825}
826
827#[allow(clippy::type_complexity)]
828pub struct PlexusInternalReactant<T, C>
829where
830 C: Codec<T> + CodecName + Send + Sync + 'static,
831 T: Send + Sync + 'static,
832{
833 current_ganglion_id: Uuid,
834 internal_ganglia:
835 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
836 external_ganglia:
837 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
838 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
839 _phantom: PhantomData<(T, C)>,
840}
841
842impl<T, C> Clone for PlexusInternalReactant<T, C>
843where
844 C: Codec<T> + CodecName + Send + Sync + 'static,
845 T: Send + Sync + 'static,
846{
847 fn clone(&self) -> Self {
848 Self {
849 current_ganglion_id: self.current_ganglion_id,
850 internal_ganglia: self.internal_ganglia.clone(),
851 external_ganglia: self.external_ganglia.clone(),
852 reactions: self.reactions.clone(),
853 _phantom: self._phantom,
854 }
855 }
856}
857
858impl<T, C> PlexusInternalReactant<T, C>
859where
860 C: Codec<T> + CodecName + Send + Sync + 'static,
861 T: Send + Sync + 'static,
862{
863 #[allow(clippy::type_complexity)]
864 pub fn new(
865 current_ganglion_id: Uuid,
866 internal_ganglia: Arc<
867 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
868 >,
869 external_ganglia: Arc<
870 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
871 >,
872 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
873 ) -> Self {
874 Self {
875 current_ganglion_id,
876 internal_ganglia,
877 external_ganglia,
878 reactions,
879 _phantom: PhantomData,
880 }
881 }
882}
883
884impl<T, C> Reactant<T, C> for PlexusInternalReactant<T, C>
885where
886 C: Codec<T> + CodecName + Send + Sync + 'static,
887 T: Send + Sync + 'static,
888{
889 fn react(
890 &self,
891 payload: Arc<Payload<T, C>>,
892 ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
893 let current_ganglion_id = self.current_ganglion_id;
894 let internal_ganglia = self.internal_ganglia.clone();
895 let external_ganglia = self.external_ganglia.clone();
896 let reactions = self.reactions.clone();
897
898 let payload_clone = payload.clone();
899 Box::pin(
900 async move {
901 tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
902
903 let reaction_set_arc = reactions
904 .get_with(payload_clone.span_id(), async {
905 Arc::new(Mutex::new(HashSet::new()))
906 })
907 .await;
908
909 let reaction_set_copy = {
910 let mut set = reaction_set_arc.lock().await;
911 if set.contains(¤t_ganglion_id) {
912 tracing::debug!(
913 "Ganglion {current_ganglion_id} already processed reaction, returning early"
914 );
915 return Ok(());
916 }
917 set.insert(current_ganglion_id);
918 set.clone()
919 };
920
921 let erased_payload = erase_payload(payload_clone.clone());
922
923 type UnifiedTransmitFuture = Pin<
924 Box<
925 dyn Future<Output = Result<Vec<()>, crate::ganglion::GanglionError>> + Send,
926 >,
927 >;
928
929 let internal_ganglia_guard = internal_ganglia.read().await;
930 let internal_ganglia_to_process: Vec<_> = internal_ganglia_guard
931 .iter()
932 .filter(|(ganglion_id, _ganglion)| {
933 *ganglion_id != ¤t_ganglion_id
934 && !reaction_set_copy.contains(ganglion_id)
935 })
936 .map(|(_ganglion_id, ganglion)| ganglion.clone())
937 .collect();
938 drop(internal_ganglia_guard);
939
940 let external_ganglia_guard = external_ganglia.read().await;
941 let external_ganglia_to_process: Vec<_> = external_ganglia_guard
942 .iter()
943 .filter(|(ganglion_id, _ganglion)| !reaction_set_copy.contains(ganglion_id))
944 .map(|(_ganglion_id, ganglion)| ganglion.clone())
945 .collect();
946 drop(external_ganglia_guard);
947
948 let internal_futures =
949 internal_ganglia_to_process
950 .into_iter()
951 .map(|ganglion_mutex| {
952 let payload = erased_payload.clone();
953 Box::pin(async move {
954 let mut ganglion = ganglion_mutex.lock().await;
955 ganglion.transmit(payload.clone()).await
956 }) as UnifiedTransmitFuture
957 });
958
959 let external_futures =
960 external_ganglia_to_process
961 .into_iter()
962 .map(|ganglion_mutex| {
963 let payload = erased_payload.clone();
964 Box::pin(async move {
965 let mut ganglion = ganglion_mutex.lock().await;
966 ganglion.transmit(payload.clone()).await
967 }) as UnifiedTransmitFuture
968 });
969
970 let all_futures: Vec<_> = internal_futures.chain(external_futures).collect();
971 let _results = join_all(all_futures).await;
972 Ok(())
973 }
974 .instrument(payload.span_debug("PlexusInternalReactant::react")),
975 )
976 }
977
978 fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
979 erase_reactant(self)
980 }
981}
982
983#[allow(clippy::type_complexity)]
985pub struct PlexusExternalInternalReactant<T, C>
986where
987 C: Codec<T> + CodecName + Send + Sync + 'static,
988 T: Send + Sync + 'static,
989{
990 current_ganglion_id: Uuid,
991 internal_ganglia:
992 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
993 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
994 _phantom: PhantomData<(T, C)>,
995}
996
997impl<T, C> Clone for PlexusExternalInternalReactant<T, C>
998where
999 C: Codec<T> + CodecName + Send + Sync + 'static,
1000 T: Send + Sync + 'static,
1001{
1002 fn clone(&self) -> Self {
1003 Self {
1004 current_ganglion_id: self.current_ganglion_id,
1005 internal_ganglia: self.internal_ganglia.clone(),
1006 reactions: self.reactions.clone(),
1007 _phantom: self._phantom,
1008 }
1009 }
1010}
1011
1012impl<T, C> PlexusExternalInternalReactant<T, C>
1013where
1014 C: Codec<T> + CodecName + Send + Sync + 'static,
1015 T: Send + Sync + 'static,
1016{
1017 #[allow(clippy::type_complexity)]
1018 pub fn new(
1019 current_ganglion_id: Uuid,
1020 internal_ganglia: Arc<
1021 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
1022 >,
1023 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
1024 ) -> Self {
1025 Self {
1026 current_ganglion_id,
1027 internal_ganglia,
1028 reactions,
1029 _phantom: PhantomData,
1030 }
1031 }
1032}
1033
1034impl<T, C> Reactant<T, C> for PlexusExternalInternalReactant<T, C>
1035where
1036 C: Codec<T> + CodecName + Send + Sync + 'static,
1037 T: Send + Sync + 'static,
1038{
1039 fn react(
1040 &self,
1041 payload: Arc<Payload<T, C>>,
1042 ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
1043 let current_ganglion_id = self.current_ganglion_id;
1044 let internal_ganglia = self.internal_ganglia.clone();
1045 let reactions = self.reactions.clone();
1046
1047 let payload_clone = payload.clone();
1048 Box::pin(
1049 async move {
1050 tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
1051
1052 let reaction_set_arc = reactions
1053 .get_with(payload_clone.span_id(), async {
1054 Arc::new(Mutex::new(HashSet::new()))
1055 })
1056 .await;
1057
1058 let reaction_set_copy = {
1059 let mut set = reaction_set_arc.lock().await;
1060 if set.contains(¤t_ganglion_id) {
1061 tracing::debug!(
1062 "Ganglion {current_ganglion_id} already processed reaction, returning early"
1063 );
1064 return Ok(());
1065 }
1066 set.insert(current_ganglion_id);
1067 set.clone()
1068 };
1069
1070 let erased_payload = erase_payload(payload_clone.clone());
1071
1072 let internal_ganglia_guard = internal_ganglia.read().await;
1073 let internal_ganglia_to_process: Vec<_> = internal_ganglia_guard
1074 .iter()
1075 .filter(|(ganglion_id, _ganglion)| {
1076 !reaction_set_copy.contains(ganglion_id)
1077 })
1078 .map(|(_ganglion_id, ganglion)| ganglion.clone())
1079 .collect();
1080 drop(internal_ganglia_guard);
1081
1082 let internal_futures = internal_ganglia_to_process.into_iter().map(
1083 |ganglion_mutex| {
1084 let payload = erased_payload.clone();
1085 Box::pin(async move {
1086 let mut ganglion = ganglion_mutex.lock().await;
1087 ganglion.transmit(payload.clone()).await
1088 })
1089 },
1090 );
1091
1092 let _results = join_all(internal_futures).await;
1093 Ok(())
1094 }
1095 .instrument(payload.span_debug("PlexusExternalInternalReactant::react")),
1096 )
1097 }
1098
1099 fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
1100 erase_reactant(self)
1101 }
1102}
1103
1104#[allow(clippy::type_complexity)]
1106pub struct PlexusExternalExternalReactant<T, C>
1107where
1108 C: Codec<T> + CodecName + Send + Sync + 'static,
1109 T: Send + Sync + 'static,
1110{
1111 current_ganglion_id: Uuid,
1112 external_ganglia:
1113 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
1114 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
1115 _phantom: PhantomData<(T, C)>,
1116}
1117
1118impl<T, C> Clone for PlexusExternalExternalReactant<T, C>
1119where
1120 C: Codec<T> + CodecName + Send + Sync + 'static,
1121 T: Send + Sync + 'static,
1122{
1123 fn clone(&self) -> Self {
1124 Self {
1125 current_ganglion_id: self.current_ganglion_id,
1126 external_ganglia: self.external_ganglia.clone(),
1127 reactions: self.reactions.clone(),
1128 _phantom: self._phantom,
1129 }
1130 }
1131}
1132
1133impl<T, C> PlexusExternalExternalReactant<T, C>
1134where
1135 C: Codec<T> + CodecName + Send + Sync + 'static,
1136 T: Send + Sync + 'static,
1137{
1138 #[allow(clippy::type_complexity)]
1139 pub fn new(
1140 current_ganglion_id: Uuid,
1141 external_ganglia: Arc<
1142 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
1143 >,
1144 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
1145 ) -> Self {
1146 Self {
1147 current_ganglion_id,
1148 external_ganglia,
1149 reactions,
1150 _phantom: PhantomData,
1151 }
1152 }
1153}
1154
1155impl<T, C> ReactantRaw<T, C> for PlexusExternalExternalReactant<T, C>
1156where
1157 C: Codec<T> + CodecName + Send + Sync + 'static,
1158 T: Send + Sync + 'static,
1159{
1160 fn react(
1161 &self,
1162 payload: Arc<PayloadRaw<T, C>>,
1163 ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
1164 let current_ganglion_id = self.current_ganglion_id;
1165 let external_ganglia = self.external_ganglia.clone();
1166 let reactions = self.reactions.clone();
1167
1168 let payload_clone = payload.clone();
1169 Box::pin(
1170 async move {
1171 tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
1172
1173 let reaction_set_arc = reactions
1174 .get_with(payload_clone.span_id(), async {
1175 Arc::new(Mutex::new(HashSet::new()))
1176 })
1177 .await;
1178
1179 let reaction_set_copy = {
1180 let mut set = reaction_set_arc.lock().await;
1181 if set.contains(¤t_ganglion_id) {
1182 tracing::debug!(
1183 "PlexusExternalExternalReactant::react - Ganglion {current_ganglion_id} already processed reaction, returning early"
1184 );
1185 return Ok(());
1186 }
1187 set.insert(current_ganglion_id);
1188 set.clone()
1189 };
1190
1191 let erased_payload = erase_payload_raw(payload_clone.clone());
1192
1193 let external_ganglia_guard = external_ganglia.read().await;
1194 let external_ganglia_to_process: Vec<_> = external_ganglia_guard
1195 .iter()
1196 .filter(|(ganglion_id, _ganglion)| {
1197 *ganglion_id != ¤t_ganglion_id
1198 && !reaction_set_copy.contains(ganglion_id)
1199 })
1200 .map(|(_ganglion_id, ganglion)| ganglion.clone())
1201 .collect();
1202 drop(external_ganglia_guard);
1203
1204 let external_futures =
1205 external_ganglia_to_process
1206 .into_iter()
1207 .map(|ganglion_mutex| {
1208 let payload = erased_payload.clone();
1209 Box::pin(async move {
1210 let mut ganglion = ganglion_mutex.lock().await;
1211 ganglion.transmit_encoded(payload.clone()).await
1212 })
1213 });
1214
1215 let _results = join_all(external_futures).await;
1216 Ok(())
1217 }
1218 .instrument(payload.span_debug("PlexusExternalExternalReactant::react")),
1219 )
1220 }
1221
1222 fn erase_raw(self: Box<Self>) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
1223 erase_reactant_raw(self)
1224 }
1225}
1226
1227#[cfg(test)]
1228mod tests {
1229 use super::*;
1230 use crate::erasure::payload::{erase_payload, erase_payload_raw};
1231 use crate::erasure::reactant::{erase_reactant, erase_reactant_raw};
1232 use crate::logging::TraceContext;
1233 use crate::neuron::NeuronImpl;
1234 use crate::payload::{Payload, PayloadRaw};
1235 use crate::test_utils::{
1236 DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant,
1237 TokioMpscReactantRaw, test_namespace,
1238 };
1239 use tokio::sync::mpsc::channel;
1240 use uuid::Uuid;
1241
1242 #[test]
1243 fn test_cartesian_product_equivalence() {
1244 let neurons = [("neuron1", "Neuron1"), ("neuron2", "Neuron2")];
1245 let ganglia = [("ganglion1", "uuid1"), ("ganglion2", "uuid2")];
1246
1247 let mut old_combinations = HashSet::new();
1248 for (neuron_name, _neuron) in neurons.iter() {
1249 for (ganglion_id, _ganglion_mutex) in ganglia.iter() {
1250 old_combinations.insert((neuron_name.to_string(), ganglion_id.to_string()));
1251 }
1252 }
1253
1254 let new_combinations: HashSet<(String, String)> = neurons
1255 .iter()
1256 .cartesian_product(ganglia.iter())
1257 .map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
1258 (neuron_name.to_string(), ganglion_id.to_string())
1259 })
1260 .collect();
1261
1262 assert_eq!(old_combinations, new_combinations);
1263 assert_eq!(old_combinations.len(), 4);
1264 }
1265
1266 #[tokio::test]
1267 async fn test_plexus_creation() {
1268 let plexus: Plexus = Plexus::new(vec![], vec![]).await;
1269
1270 assert_eq!(plexus.relevant_neurons.len(), 0);
1271 assert_eq!(plexus.ignored_neurons.len(), 0);
1272 }
1273
1274 #[tokio::test]
1275 async fn test_plexus_capable() {
1276 let ns = test_namespace();
1277 let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
1278
1279 let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1280 let neuron_arc = Arc::new(neuron);
1281
1282 assert!(plexus.capable(neuron_arc));
1283 }
1284
1285 #[tokio::test]
1286 async fn test_plexus_adapt() {
1287 let ns = test_namespace();
1288 let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
1289
1290 let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1291
1292 plexus
1293 .adapt(Arc::new(neuron))
1294 .await
1295 .expect("Failed to adapt neuron");
1296
1297 let neurons = plexus.neurons.read().await;
1298 assert_eq!(neurons.len(), 1);
1299 }
1300
1301 #[tokio::test]
1302 async fn test_plexus_ganglion_internal_adapt() {
1303 let ns = test_namespace();
1304 let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
1305
1306 let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1307 let neuron_arc = Arc::new(neuron);
1308
1309 assert!(plexus.capable(neuron_arc.clone()));
1310
1311 plexus
1312 .adapt(neuron_arc)
1313 .await
1314 .expect("Failed to adapt neuron");
1315
1316 let neurons = plexus.neurons.read().await;
1317 assert_eq!(neurons.len(), 1);
1318 }
1319
1320 #[tokio::test]
1321 async fn test_plexus_external_inprocess_transmit_via_adapt() {
1322 let ns = test_namespace();
1323
1324 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1325 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1326 let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
1327 let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
1328 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1329 let neuron_arc = neuron_impl.clone_to_arc();
1330
1331 let mut ganglion_inprocess = GanglionInprocess::new();
1332 let mut ganglion_external_inprocess = GanglionExternalInprocess::new();
1333
1334 let _ = ganglion_inprocess.adapt(neuron_arc.clone()).await;
1335 ganglion_external_inprocess
1336 .adapt(neuron_arc.clone())
1337 .await
1338 .expect("Failed to adapt neuron");
1339
1340 let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1341 erase_reactant(Box::new(TokioMpscReactant::new(tx1))),
1342 erase_reactant(Box::new(TokioMpscReactant::new(tx2))),
1343 ];
1344
1345 let erased_raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1346 erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx1))),
1347 erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx2))),
1348 ];
1349
1350 ganglion_inprocess
1351 .react(neuron_arc.name(), erased_reactants, vec![])
1352 .await
1353 .expect("Failed to react");
1354 ganglion_external_inprocess
1355 .react(neuron_arc.name(), vec![], erased_raw_reactants, vec![])
1356 .await
1357 .expect("Failed to react raw");
1358
1359 let test_payload = Payload::with_correlation(
1360 DebugStruct {
1361 foo: 42,
1362 bar: "test".to_string(),
1363 },
1364 neuron_arc.clone(),
1365 None,
1366 );
1367
1368 let test_payload_raw = PayloadRaw::with_correlation(
1369 DebugCodec::encode(&DebugStruct {
1370 foo: 42,
1371 bar: "test".to_string(),
1372 })
1373 .expect("Failed to encode test data"),
1374 neuron_arc.clone(),
1375 None,
1376 );
1377
1378 let erased_payload = erase_payload(test_payload);
1379 ganglion_inprocess
1380 .transmit(erased_payload)
1381 .await
1382 .expect("Failed to transmit");
1383
1384 let erased_payload_raw = erase_payload_raw(test_payload_raw);
1385 ganglion_external_inprocess
1386 .transmit_encoded(erased_payload_raw)
1387 .await
1388 .expect("Failed to transmit encoded");
1389
1390 let received1 = rx1.recv().await.expect("Failed to receive payload 1");
1391 let received2 = rx2.recv().await.expect("Failed to receive payload 2");
1392 let raw_received1 = raw_rx1
1393 .recv()
1394 .await
1395 .expect("Failed to receive raw payload 1");
1396 let raw_received2 = raw_rx2
1397 .recv()
1398 .await
1399 .expect("Failed to receive raw payload 2");
1400
1401 assert_eq!(received1.value.foo, 42);
1402 assert_eq!(received2.value.foo, 42);
1403 let decoded1 =
1404 DebugCodec::decode(&raw_received1.value).expect("Failed to decode raw payload 1");
1405 let decoded2 =
1406 DebugCodec::decode(&raw_received2.value).expect("Failed to decode raw payload 2");
1407 assert_eq!(decoded1.foo, 42);
1408 assert_eq!(decoded2.foo, 42);
1409 }
1410
1411 #[tokio::test]
1412 async fn test_plexus_external_inprocess_across_threads() {
1413 let ns = test_namespace();
1414
1415 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1416 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1417 let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
1418 let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
1419 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1420 let neuron_arc = neuron_impl.clone_to_arc();
1421
1422 let ganglion_inprocess = Arc::new(Mutex::new(GanglionInprocess::new()));
1423 let ganglion_external_inprocess = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1424
1425 {
1426 let mut ganglion = ganglion_inprocess.lock().await;
1427 let _ = ganglion.adapt(neuron_arc.clone()).await;
1428 }
1429 {
1430 let mut ganglion = ganglion_external_inprocess.lock().await;
1431 ganglion
1432 .adapt(neuron_arc.clone())
1433 .await
1434 .expect("Failed to adapt neuron");
1435 }
1436
1437 let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
1438 erase_reactant(Box::new(TokioMpscReactant::new(tx1))),
1439 erase_reactant(Box::new(TokioMpscReactant::new(tx2))),
1440 ];
1441
1442 let erased_raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
1443 erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx1))),
1444 erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx2))),
1445 ];
1446
1447 {
1448 let mut ganglion = ganglion_inprocess.lock().await;
1449 ganglion
1450 .react(neuron_arc.name(), erased_reactants, vec![])
1451 .await
1452 .expect("Failed to react");
1453 }
1454 {
1455 let mut ganglion = ganglion_external_inprocess.lock().await;
1456 ganglion
1457 .react(neuron_arc.name(), vec![], erased_raw_reactants, vec![])
1458 .await
1459 .expect("Failed to react raw");
1460 }
1461
1462 let ganglion_inprocess_clone = ganglion_inprocess.clone();
1463 let ganglion_external_inprocess_clone = ganglion_external_inprocess.clone();
1464
1465 let neuron_arc_clone = neuron_arc.clone();
1466 let task1 = tokio::task::spawn(async move {
1467 let test_payload = Payload::with_correlation(
1468 DebugStruct {
1469 foo: 42,
1470 bar: "test".to_string(),
1471 },
1472 neuron_arc_clone.clone(),
1473 None,
1474 );
1475
1476 let erased_payload = erase_payload(test_payload);
1477 let mut ganglion = ganglion_inprocess_clone.lock().await;
1478 ganglion
1479 .transmit(erased_payload)
1480 .await
1481 .expect("Failed to transmit");
1482 });
1483
1484 let neuron_arc_clone2 = neuron_arc.clone();
1485 let task2 = tokio::task::spawn(async move {
1486 let test_payload_raw = PayloadRaw::with_correlation(
1487 DebugCodec::encode(&DebugStruct {
1488 foo: 42,
1489 bar: "test".to_string(),
1490 })
1491 .expect("Failed to encode test data"),
1492 neuron_arc_clone2.clone(),
1493 None,
1494 );
1495
1496 let erased_payload_raw = erase_payload_raw(test_payload_raw);
1497 let mut ganglion = ganglion_external_inprocess_clone.lock().await;
1498 ganglion
1499 .transmit_encoded(erased_payload_raw)
1500 .await
1501 .expect("Failed to transmit encoded");
1502 });
1503
1504 task1.await.expect("Task 1 failed");
1505 task2.await.expect("Task 2 failed");
1506
1507 let received1 = rx1.recv().await.expect("Failed to receive payload 1");
1508 let received2 = rx2.recv().await.expect("Failed to receive payload 2");
1509 let raw_received1 = raw_rx1
1510 .recv()
1511 .await
1512 .expect("Failed to receive raw payload 1");
1513 let raw_received2 = raw_rx2
1514 .recv()
1515 .await
1516 .expect("Failed to receive raw payload 2");
1517
1518 assert_eq!(received1.value.foo, 42);
1519 assert_eq!(received2.value.foo, 42);
1520 let decoded1 =
1521 DebugCodec::decode(&raw_received1.value).expect("Failed to decode raw payload 1");
1522 let decoded2 =
1523 DebugCodec::decode(&raw_received2.value).expect("Failed to decode raw payload 2");
1524 assert_eq!(decoded1.foo, 42);
1525 assert_eq!(decoded2.foo, 42);
1526 }
1527
1528 #[tokio::test]
1529 async fn test_plexus_transmit_encoded_external_to_internal() {
1530 let ns = test_namespace();
1531 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
1532 let neuron_arc = neuron_impl.clone_to_arc();
1533
1534 let (tx_internal, mut rx_internal) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1535
1536 let external_ganglion = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1537 let mut internal_plexus = Plexus::new(vec![erase_neuron(neuron_arc.clone())], vec![]).await;
1538
1539 {
1540 let mut g = external_ganglion.lock().await;
1541 g.adapt(neuron_arc.clone())
1542 .await
1543 .expect("Failed to adapt neuron to external ganglion");
1544 }
1545 internal_plexus
1546 .adapt(neuron_arc.clone())
1547 .await
1548 .expect("Failed to adapt neuron to internal plexus");
1549
1550 let reactants = vec![erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(
1551 TokioMpscReactant::new(tx_internal),
1552 ))];
1553 internal_plexus
1554 .react(neuron_arc.name(), reactants, vec![])
1555 .await
1556 .expect("Failed to add reactant to internal plexus");
1557
1558 let _ = internal_plexus
1559 .infuse_external_ganglion(external_ganglion.clone())
1560 .await;
1561
1562 let test_data = DebugStruct {
1563 foo: 123,
1564 bar: "plexus_transmit_encoded_test".to_string(),
1565 };
1566 let encoded_data = neuron_impl
1567 .encode(&test_data)
1568 .expect("Failed to encode test data");
1569 let correlation_id = Uuid::now_v7();
1570
1571 let span_id = Uuid::now_v7().as_u128() as u64;
1572 let payload_raw = Arc::new(PayloadRaw {
1573 value: Arc::new(encoded_data),
1574 neuron: neuron_arc.clone(),
1575 trace: TraceContext::from_parts(correlation_id, span_id, None),
1576 });
1577
1578 {
1579 let mut g = external_ganglion.lock().await;
1580 let erased_payload_raw = erase_payload_raw(payload_raw.clone());
1581 g.transmit_encoded(erased_payload_raw)
1582 .await
1583 .expect("Failed to transmit encoded payload");
1584 }
1585
1586 tokio::time::sleep(Duration::from_millis(100)).await;
1587
1588 assert_eq!(
1589 rx_internal.len(),
1590 1,
1591 "Internal plexus should have received exactly one payload"
1592 );
1593 let received_payload = rx_internal
1594 .recv()
1595 .await
1596 .expect("Should receive decoded payload in internal plexus");
1597
1598 assert_eq!(
1599 received_payload.value.foo, test_data.foo,
1600 "Decoded payload should have correct foo value"
1601 );
1602 assert_eq!(
1603 received_payload.value.bar, test_data.bar,
1604 "Decoded payload should have correct bar value"
1605 );
1606 assert_eq!(
1607 received_payload.correlation_id(), correlation_id,
1608 "Correlation ID should match"
1609 );
1610 }
1611
1612 #[tokio::test]
1613 async fn test_plexus_ganglion_excision() {
1614 let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
1615
1616 let internal_ganglion_1 = Arc::new(Mutex::new(GanglionInprocess::new()));
1617 let internal_id_1 = { internal_ganglion_1.lock().await.unique_id() };
1618
1619 plexus
1620 .infuse_ganglion(internal_ganglion_1.clone())
1621 .await
1622 .expect("Failed to infuse internal ganglion 1");
1623
1624 {
1625 let internal_ganglia = plexus.internal_ganglia.read().await;
1626 assert!(internal_ganglia.contains_key(&internal_id_1));
1627 }
1628
1629 let excised_internal_1 = plexus
1630 .excise_ganglion_by_id(internal_id_1)
1631 .await
1632 .expect("Failed to excise internal ganglion 1 by ID");
1633 assert!(excised_internal_1.is_some());
1634 assert_eq!(
1635 excised_internal_1.unwrap().lock().await.unique_id(),
1636 internal_id_1
1637 );
1638
1639 {
1640 let internal_ganglia = plexus.internal_ganglia.read().await;
1641 assert!(!internal_ganglia.contains_key(&internal_id_1));
1642 }
1643
1644 let internal_ganglion_2 = Arc::new(Mutex::new(GanglionInprocess::new()));
1645 let internal_id_2 = { internal_ganglion_2.lock().await.unique_id() };
1646
1647 plexus
1648 .infuse_ganglion(internal_ganglion_2.clone())
1649 .await
1650 .expect("Failed to infuse internal ganglion 2");
1651
1652 let excised_internal_2 = plexus
1653 .excise_ganglion(internal_ganglion_2.clone())
1654 .await
1655 .expect("Failed to excise internal ganglion 2 by instance");
1656 assert!(excised_internal_2.is_some());
1657 assert_eq!(
1658 excised_internal_2.unwrap().lock().await.unique_id(),
1659 internal_id_2
1660 );
1661
1662 {
1663 let internal_ganglia = plexus.internal_ganglia.read().await;
1664 assert!(!internal_ganglia.contains_key(&internal_id_2));
1665 }
1666
1667 let external_ganglion_1 = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1668 let external_id_1 = { external_ganglion_1.lock().await.unique_id() };
1669
1670 plexus
1671 .infuse_external_ganglion(external_ganglion_1.clone())
1672 .await
1673 .expect("Failed to infuse external ganglion 1");
1674
1675 {
1676 let external_ganglia = plexus.external_ganglia.read().await;
1677 assert!(external_ganglia.contains_key(&external_id_1));
1678 }
1679
1680 let excised_external_1 = plexus
1681 .excise_external_ganglion_by_id(external_id_1)
1682 .await
1683 .expect("Failed to excise external ganglion 1 by ID");
1684 assert!(excised_external_1.is_some());
1685 assert_eq!(
1686 excised_external_1.unwrap().lock().await.unique_id(),
1687 external_id_1
1688 );
1689
1690 {
1691 let external_ganglia = plexus.external_ganglia.read().await;
1692 assert!(!external_ganglia.contains_key(&external_id_1));
1693 }
1694
1695 let external_ganglion_2 = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
1696 let external_id_2 = { external_ganglion_2.lock().await.unique_id() };
1697
1698 plexus
1699 .infuse_external_ganglion(external_ganglion_2.clone())
1700 .await
1701 .expect("Failed to infuse external ganglion 2");
1702
1703 let excised_external_2 = plexus
1704 .excise_external_ganglion(external_ganglion_2.clone())
1705 .await
1706 .expect("Failed to excise external ganglion 2 by instance");
1707 assert!(excised_external_2.is_some());
1708 assert_eq!(
1709 excised_external_2.unwrap().lock().await.unique_id(),
1710 external_id_2
1711 );
1712
1713 {
1714 let external_ganglia = plexus.external_ganglia.read().await;
1715 assert!(!external_ganglia.contains_key(&external_id_2));
1716 }
1717 }
1718}
1719