use crate::axon::AxonError;
use crate::codec::{Codec, CodecName};
use crate::erasure::neuron::{NeuronErased, erase_neuron};
use crate::erasure::payload::{PayloadErased, erase_payload, erase_payload_raw};
use crate::erasure::reactant::{
ErrorReactantErased, ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
};
use crate::ganglion::{
Ganglion, GanglionError, GanglionExternal, GanglionInprocess, GanglionInternal,
};
use crate::logging::LogTrace;
use crate::neuron::Neuron;
use crate::payload::{Payload, PayloadRaw};
use crate::reactant::{Reactant, ReactantRaw};
use futures_util::future::join_all;
use itertools::Itertools;
use moka::future::Cache;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::{Mutex, RwLock};
use tracing::{Instrument};
use uuid::Uuid;
#[derive(Error, Debug)]
pub enum PlexusError {
#[error("Ganglion error: {0}")]
Ganglion(#[from] GanglionError),
#[error("Failed to acquire lock on external ganglia")]
ExternalGangliaLock,
#[error("Failed to acquire lock on internal ganglia")]
InternalGangliaLock,
#[error("Failed to acquire lock on neurons")]
NeuronsLock,
#[error("Failed to acquire lock on reactant factories")]
ReactantFactoriesLock,
#[error("Failed to acquire lock on neuron ganglia")]
NeuronGangliaLock,
#[error("Failed to acquire lock on reactions")]
ReactionsLock,
#[error("Neuron adaptation failed for {neuron_name}")]
NeuronAdaptation { neuron_name: String },
#[error("Reactant creation failed for {neuron_name}")]
ReactantCreation { neuron_name: String },
#[error("Transmission failed")]
Transmission,
}
impl From<AxonError> for PlexusError {
fn from(error: AxonError) -> Self {
match error {
AxonError::GanglionError(e) => PlexusError::Ganglion(e),
AxonError::NeuronNotAdapted {
neuron_name,
ganglion_name,
ganglion_id,
} => PlexusError::Ganglion(GanglionError::SynapseNotFound {
neuron_name,
ganglion_name,
ganglion_id,
}),
AxonError::SynapseLock {
neuron_name,
ganglion_name,
ganglion_id,
} => PlexusError::Ganglion(GanglionError::SynapseLock {
neuron_name,
ganglion_name,
ganglion_id,
}),
AxonError::Transmit {
neuron_name,
ganglion_name,
ganglion_id,
message,
} => PlexusError::Ganglion(GanglionError::Transmit {
neuron_name,
ganglion_name,
ganglion_id,
message,
}),
AxonError::Encode {
neuron_name,
ganglion_name,
ganglion_id,
} => PlexusError::Ganglion(GanglionError::Encode {
neuron_name,
ganglion_name,
ganglion_id,
}),
AxonError::Decode {
neuron_name,
ganglion_name,
ganglion_id,
} => PlexusError::Ganglion(GanglionError::Decode {
neuron_name,
ganglion_name,
ganglion_id,
}),
AxonError::TransmissionTimeout => PlexusError::Transmission,
}
}
}
pub struct PlexusReactantFactories {
pub internal_factory: Arc<dyn ErasedInternalReactantFactory + Send + Sync>,
pub external_internal_factory: Arc<dyn ErasedExternalInternalReactantFactory + Send + Sync>,
pub external_external_factory: Arc<dyn ErasedExternalExternalReactantFactory + Send + Sync>,
}
#[allow(clippy::type_complexity)]
pub struct Plexus {
id: Uuid,
inproc_ganglion: Arc<Mutex<GanglionInprocess>>,
external_ganglia:
Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
internal_ganglia:
Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
neurons: Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
reactant_factories: Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
neuron_ganglia: Arc<RwLock<HashSet<(String, Uuid)>>>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
relevant_neurons: HashSet<String>,
ignored_neurons: HashSet<String>,
}
impl Plexus {
pub async fn neurons(
&self,
) -> tokio::sync::RwLockReadGuard<
'_,
HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>,
> {
self.neurons.read().await
}
pub async fn new(
relevant_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
ignored_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
) -> Self {
let inproc_ganglion = Arc::new(Mutex::new(GanglionInprocess::new()));
let relevant_neuron_names = relevant_neurons
.iter()
.map(|n| n.name())
.collect::<HashSet<String>>();
let ignored_neuron_names = ignored_neurons
.iter()
.map(|n| n.name())
.collect::<HashSet<String>>();
let reactions = Cache::builder()
.time_to_idle(Duration::from_secs(60)) .build();
let plexus = Self {
id: Uuid::now_v7(),
inproc_ganglion: inproc_ganglion.clone(),
external_ganglia: Arc::new(RwLock::new(HashMap::new())),
internal_ganglia: Arc::new(RwLock::new(HashMap::new())),
neurons: Arc::new(RwLock::new(HashMap::new())),
reactant_factories: Arc::new(RwLock::new(HashMap::new())),
neuron_ganglia: Arc::new(RwLock::new(HashSet::new())),
reactions,
relevant_neurons: relevant_neuron_names,
ignored_neurons: ignored_neuron_names,
};
let ganglion_id = {
let ganglion_guard = inproc_ganglion.lock().await;
ganglion_guard.unique_id()
};
plexus
.internal_ganglia
.write()
.await
.insert(ganglion_id, inproc_ganglion);
plexus
}
pub async fn new_shared(
relevant_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
ignored_neurons: Vec<Arc<dyn NeuronErased + Send + Sync + 'static>>,
) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(
Self::new(relevant_neurons, ignored_neurons).await,
))
}
pub async fn infuse_ganglion<G>(&mut self, ganglion: Arc<Mutex<G>>) -> Result<(), PlexusError>
where
G: GanglionInternal + Ganglion + Send + Sync + 'static,
{
let ganglion_id = {
let ganglion_guard = ganglion.lock().await;
ganglion_guard.unique_id()
};
self.internal_ganglia
.write()
.await
.insert(ganglion_id, ganglion.clone());
self.update_neuron_ganglia().await?;
Ok(())
}
pub async fn infuse_external_ganglion<G>(
&mut self,
ganglion: Arc<Mutex<G>>,
) -> Result<(), PlexusError>
where
G: GanglionExternal + Send + Sync + 'static,
{
let ganglion_id = {
let ganglion_guard = ganglion.lock().await;
ganglion_guard.unique_id()
};
self.external_ganglia
.write()
.await
.insert(ganglion_id, ganglion);
self.update_neuron_ganglia().await?;
Ok(())
}
pub async fn excise_ganglion_by_id(
&mut self,
ganglion_id: Uuid,
) -> Result<Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>, PlexusError> {
let removed = self.internal_ganglia.write().await.remove(&ganglion_id);
if removed.is_some() {
self.update_neuron_ganglia().await?;
}
Ok(removed)
}
pub async fn excise_ganglion<G>(
&mut self,
ganglion: Arc<Mutex<G>>,
) -> Result<Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>, PlexusError>
where
G: GanglionInternal + Send + Sync + ?Sized + 'static,
{
let ganglion_id = {
let guard = ganglion.lock().await;
guard.unique_id()
};
self.excise_ganglion_by_id(ganglion_id).await
}
pub async fn excise_external_ganglion_by_id(
&mut self,
ganglion_id: Uuid,
) -> Result<Option<Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>, PlexusError> {
let removed = self.external_ganglia.write().await.remove(&ganglion_id);
if removed.is_some() {
self.update_neuron_ganglia().await?;
}
Ok(removed)
}
pub async fn excise_external_ganglion<G>(
&mut self,
ganglion: Arc<Mutex<G>>,
) -> Result<Option<Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>, PlexusError>
where
G: GanglionExternal + Send + Sync + ?Sized + 'static,
{
let ganglion_id = {
let guard = ganglion.lock().await;
guard.unique_id()
};
self.excise_external_ganglion_by_id(ganglion_id).await
}
pub async fn update_neurons(
&self,
neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
) -> Result<(), PlexusError> {
let mut neurons = self.neurons.write().await;
neurons.insert(neuron.name(), neuron);
drop(neurons);
self.update_neuron_ganglia().await?;
Ok(())
}
async fn update_neuron_ganglia(&self) -> Result<(), PlexusError> {
update_neuron_ganglia_internal(
&self.neurons,
&self.internal_ganglia,
&self.external_ganglia,
&self.neuron_ganglia,
&self.reactant_factories,
&self.reactions,
)
.await
}
}
impl Ganglion for Plexus {
fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let neuron_name = neuron.name();
if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
return false;
}
if self.ignored_neurons.contains(&neuron_name) {
return false;
}
true
}
fn adapt<T, C>(
&mut self,
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
tracing::debug!(neuron = %neuron.name(), "Plexus::adapt - Adapting neuron");
if !self.capable(neuron.clone()) {
return Box::pin(async move { Ok(()) });
}
let inproc_ganglion = self.inproc_ganglion.clone();
let erased_neuron = erase_neuron(neuron.clone());
let reactions = self.reactions.clone();
let reactant_factories = self.reactant_factories.clone();
let neurons = self.neurons.clone();
let id = self.id;
let internal_ganglia = self.internal_ganglia.clone();
let external_ganglia = self.external_ganglia.clone();
let neuron_ganglia = self.neuron_ganglia.clone();
Box::pin(async move {
let future = {
let mut inproc_ganglion_guard = inproc_ganglion.lock().await;
inproc_ganglion_guard.adapt(neuron.clone())
};
let result = future.await;
let factories = PlexusReactantFactories {
internal_factory: Arc::new(PlexusInternalReactantFactory::<T, C>::new()),
external_internal_factory: Arc::new(
PlexusExternalInternalReactantFactory::<T, C>::new(),
),
external_external_factory: Arc::new(
PlexusExternalExternalReactantFactory::<T, C>::new(),
),
};
reactant_factories
.write()
.await
.insert(neuron.name(), factories);
if let Err(plexus_error) = update_neurons_internal(
&neurons,
&internal_ganglia,
&external_ganglia,
&neuron_ganglia,
&reactant_factories,
&reactions,
erased_neuron,
)
.await
{
return Err(GanglionError::from_plexus_error(
plexus_error,
neuron.name(),
"Plexus".to_string(),
id,
));
}
result
})
}
}
#[allow(clippy::type_complexity)]
async fn update_neurons_internal(
neurons_map: &Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
internal_ganglia: &Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
>,
external_ganglia: &Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
>,
neuron_ganglia: &Arc<RwLock<HashSet<(String, Uuid)>>>,
reactant_factories: &Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
reactions: &Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
) -> Result<(), PlexusError> {
let mut neurons = neurons_map.write().await;
neurons.insert(neuron.name(), neuron);
drop(neurons);
update_neuron_ganglia_internal(
neurons_map,
internal_ganglia,
external_ganglia,
neuron_ganglia,
reactant_factories,
reactions,
)
.await?;
Ok(())
}
#[allow(clippy::type_complexity)]
async fn update_neuron_ganglia_internal(
neurons_map: &Arc<RwLock<HashMap<String, Arc<dyn NeuronErased + Send + Sync + 'static>>>>,
internal_ganglia_map: &Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
>,
external_ganglia_map: &Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
>,
neuron_ganglia_set: &Arc<RwLock<HashSet<(String, Uuid)>>>,
reactant_factories_map: &Arc<RwLock<HashMap<String, PlexusReactantFactories>>>,
reactions: &Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
) -> Result<(), PlexusError> {
let neurons = neurons_map.read().await;
let internal_ganglia = internal_ganglia_map.read().await;
let external_ganglia = external_ganglia_map.read().await;
let all_internal_neuron_ganglia: HashSet<(String, Uuid)> = neurons
.iter()
.cartesian_product(internal_ganglia.iter())
.map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
(neuron_name.clone(), *ganglion_id)
})
.collect();
let all_external_neuron_ganglia: HashSet<(String, Uuid)> = neurons
.iter()
.cartesian_product(external_ganglia.iter())
.map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
(neuron_name.clone(), *ganglion_id)
})
.collect();
let mut neuron_ganglia = neuron_ganglia_set.write().await;
let new_internal_combinations: Vec<(String, Uuid)> = all_internal_neuron_ganglia
.difference(&neuron_ganglia)
.cloned()
.collect();
let new_external_combinations: Vec<(String, Uuid)> = all_external_neuron_ganglia
.difference(&neuron_ganglia)
.cloned()
.collect();
*neuron_ganglia = all_internal_neuron_ganglia
.union(&all_external_neuron_ganglia)
.cloned()
.collect();
drop(neuron_ganglia);
let mut internal_reactants_by_ganglion: HashMap<
(String, Uuid),
Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
> = HashMap::new();
for (neuron_name, ganglion_id) in new_internal_combinations {
let internal_reactant = reactant_factories_map
.read()
.await
.get(&neuron_name)
.ok_or_else(|| PlexusError::ReactantCreation {
neuron_name: neuron_name.clone(),
})?
.internal_factory
.create_reactant(
ganglion_id,
internal_ganglia_map.clone(),
external_ganglia_map.clone(),
reactions.clone(),
);
internal_reactants_by_ganglion
.entry((neuron_name, ganglion_id))
.or_default()
.push(internal_reactant);
}
for ((neuron_name, ganglion_id), reactants) in internal_reactants_by_ganglion {
if let Some(ganglion_mutex) = internal_ganglia.get(&ganglion_id) {
let mut ganglion = ganglion_mutex.lock().await;
ganglion.react(neuron_name, reactants, vec![]).await?;
}
}
#[allow(clippy::type_complexity)]
let mut external_reactants_by_ganglion: HashMap<
(String, Uuid),
(
Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
),
> = HashMap::new();
for (neuron_name, ganglion_id) in new_external_combinations {
let external_internal_reactant = reactant_factories_map
.read()
.await
.get(&neuron_name)
.ok_or_else(|| PlexusError::ReactantCreation {
neuron_name: neuron_name.clone(),
})?
.external_internal_factory
.create_reactant(ganglion_id, internal_ganglia_map.clone(), reactions.clone());
let external_external_reactant = reactant_factories_map
.read()
.await
.get(&neuron_name)
.ok_or_else(|| PlexusError::ReactantCreation {
neuron_name: neuron_name.clone(),
})?
.external_external_factory
.create_reactant(ganglion_id, external_ganglia_map.clone(), reactions.clone());
let entry = external_reactants_by_ganglion
.entry((neuron_name.clone(), ganglion_id))
.or_default();
entry.0.push(external_internal_reactant);
entry.1.push(external_external_reactant);
}
for ((neuron_name, ganglion_id), (reactants, reactants_raw)) in external_reactants_by_ganglion {
if let Some(ganglion_mutex) = external_ganglia.get(&ganglion_id) {
let mut ganglion = ganglion_mutex.lock().await;
ganglion
.react(neuron_name, reactants, reactants_raw, vec![])
.await?;
}
}
Ok(())
}
impl GanglionInternal for Plexus {
fn transmit(
&mut self,
payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
let inproc_ganglion = self.inproc_ganglion.clone();
Box::pin(async move {
let future = {
let mut ganglion = inproc_ganglion.lock().await;
ganglion.transmit(payload)
};
future.await
})
}
fn react(
&mut self,
neuron_name: String,
reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
let inproc_ganglion = self.inproc_ganglion.clone();
Box::pin(async move {
let future = {
let mut ganglion = inproc_ganglion.lock().await;
ganglion.react(neuron_name, reactants, error_reactants)
};
future.await
})
}
fn react_many(
&mut self,
reactions: HashMap<
String,
(
Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
),
>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
let inproc_ganglion = self.inproc_ganglion.clone();
Box::pin(async move {
let mut ganglion = inproc_ganglion.lock().await;
ganglion.react_many(reactions).await
})
}
fn unique_id(&self) -> Uuid {
self.id
}
}
pub trait ErasedInternalReactantFactory: Send + Sync + 'static {
#[allow(clippy::type_complexity)]
fn create_reactant(
&self,
current_ganglion_id: Uuid,
internal_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
>,
external_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
}
pub trait ErasedExternalInternalReactantFactory: Send + Sync + 'static {
#[allow(clippy::type_complexity)]
fn create_reactant(
&self,
current_ganglion_id: Uuid,
internal_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
}
pub trait ErasedExternalExternalReactantFactory: Send + Sync + 'static {
#[allow(clippy::type_complexity)]
fn create_reactant(
&self,
current_ganglion_id: Uuid,
external_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
) -> Arc<dyn ReactantRawErased + Send + Sync + 'static>;
}
pub struct PlexusInternalReactantFactory<T, C> {
_phantom: PhantomData<(T, C)>,
}
impl<T, C> PlexusInternalReactantFactory<T, C> {
pub fn new() -> Self {
Self {
_phantom: PhantomData,
}
}
}
impl<T, C> Default for PlexusInternalReactantFactory<T, C> {
fn default() -> Self {
Self::new()
}
}
impl<T, C> ErasedInternalReactantFactory for PlexusInternalReactantFactory<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn create_reactant(
&self,
current_ganglion_id: Uuid,
internal_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
>,
external_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
erase_reactant(Box::new(PlexusInternalReactant::<T, C>::new(
current_ganglion_id,
internal_ganglia,
external_ganglia,
reactions,
)))
}
}
pub struct PlexusExternalInternalReactantFactory<T, C> {
_phantom: PhantomData<(T, C)>,
}
impl<T, C> PlexusExternalInternalReactantFactory<T, C> {
pub fn new() -> Self {
Self {
_phantom: PhantomData,
}
}
}
impl<T, C> Default for PlexusExternalInternalReactantFactory<T, C> {
fn default() -> Self {
Self::new()
}
}
impl<T, C> ErasedExternalInternalReactantFactory for PlexusExternalInternalReactantFactory<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn create_reactant(
&self,
current_ganglion_id: Uuid,
internal_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
erase_reactant(Box::new(PlexusExternalInternalReactant::<T, C>::new(
current_ganglion_id,
internal_ganglia,
reactions,
)))
}
}
pub struct PlexusExternalExternalReactantFactory<T, C> {
_phantom: PhantomData<(T, C)>,
}
impl<T, C> PlexusExternalExternalReactantFactory<T, C> {
pub fn new() -> Self {
Self {
_phantom: PhantomData,
}
}
}
impl<T, C> Default for PlexusExternalExternalReactantFactory<T, C> {
fn default() -> Self {
Self::new()
}
}
impl<T, C> ErasedExternalExternalReactantFactory for PlexusExternalExternalReactantFactory<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn create_reactant(
&self,
current_ganglion_id: Uuid,
external_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
erase_reactant_raw(Box::new(PlexusExternalExternalReactant::<T, C>::new(
current_ganglion_id,
external_ganglia,
reactions,
)))
}
}
#[allow(clippy::type_complexity)]
pub struct PlexusInternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
current_ganglion_id: Uuid,
internal_ganglia:
Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
external_ganglia:
Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
_phantom: PhantomData<(T, C)>,
}
impl<T, C> Clone for PlexusInternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
current_ganglion_id: self.current_ganglion_id,
internal_ganglia: self.internal_ganglia.clone(),
external_ganglia: self.external_ganglia.clone(),
reactions: self.reactions.clone(),
_phantom: self._phantom,
}
}
}
impl<T, C> PlexusInternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
#[allow(clippy::type_complexity)]
pub fn new(
current_ganglion_id: Uuid,
internal_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
>,
external_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
) -> Self {
Self {
current_ganglion_id,
internal_ganglia,
external_ganglia,
reactions,
_phantom: PhantomData,
}
}
}
impl<T, C> Reactant<T, C> for PlexusInternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn react(
&self,
payload: Arc<Payload<T, C>>,
) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
let current_ganglion_id = self.current_ganglion_id;
let internal_ganglia = self.internal_ganglia.clone();
let external_ganglia = self.external_ganglia.clone();
let reactions = self.reactions.clone();
let payload_clone = payload.clone();
Box::pin(
async move {
tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
let reaction_set_arc = reactions
.get_with(payload_clone.span_id(), async {
Arc::new(Mutex::new(HashSet::new()))
})
.await;
let reaction_set_copy = {
let mut set = reaction_set_arc.lock().await;
if set.contains(¤t_ganglion_id) {
tracing::debug!(
"Ganglion {current_ganglion_id} already processed reaction, returning early"
);
return Ok(());
}
set.insert(current_ganglion_id);
set.clone()
};
let erased_payload = erase_payload(payload_clone.clone());
type UnifiedTransmitFuture = Pin<
Box<
dyn Future<Output = Result<Vec<()>, crate::ganglion::GanglionError>> + Send,
>,
>;
let internal_ganglia_guard = internal_ganglia.read().await;
let internal_ganglia_to_process: Vec<_> = internal_ganglia_guard
.iter()
.filter(|(ganglion_id, _ganglion)| {
*ganglion_id != ¤t_ganglion_id
&& !reaction_set_copy.contains(ganglion_id)
})
.map(|(_ganglion_id, ganglion)| ganglion.clone())
.collect();
drop(internal_ganglia_guard);
let external_ganglia_guard = external_ganglia.read().await;
let external_ganglia_to_process: Vec<_> = external_ganglia_guard
.iter()
.filter(|(ganglion_id, _ganglion)| !reaction_set_copy.contains(ganglion_id))
.map(|(_ganglion_id, ganglion)| ganglion.clone())
.collect();
drop(external_ganglia_guard);
let internal_futures =
internal_ganglia_to_process
.into_iter()
.map(|ganglion_mutex| {
let payload = erased_payload.clone();
Box::pin(async move {
let mut ganglion = ganglion_mutex.lock().await;
ganglion.transmit(payload.clone()).await
}) as UnifiedTransmitFuture
});
let external_futures =
external_ganglia_to_process
.into_iter()
.map(|ganglion_mutex| {
let payload = erased_payload.clone();
Box::pin(async move {
let mut ganglion = ganglion_mutex.lock().await;
ganglion.transmit(payload.clone()).await
}) as UnifiedTransmitFuture
});
let all_futures: Vec<_> = internal_futures.chain(external_futures).collect();
let _results = join_all(all_futures).await;
Ok(())
}
.instrument(payload.span_debug("PlexusInternalReactant::react")),
)
}
fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
erase_reactant(self)
}
}
#[allow(clippy::type_complexity)]
pub struct PlexusExternalInternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
current_ganglion_id: Uuid,
internal_ganglia:
Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
_phantom: PhantomData<(T, C)>,
}
impl<T, C> Clone for PlexusExternalInternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
current_ganglion_id: self.current_ganglion_id,
internal_ganglia: self.internal_ganglia.clone(),
reactions: self.reactions.clone(),
_phantom: self._phantom,
}
}
}
impl<T, C> PlexusExternalInternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
#[allow(clippy::type_complexity)]
pub fn new(
current_ganglion_id: Uuid,
internal_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
) -> Self {
Self {
current_ganglion_id,
internal_ganglia,
reactions,
_phantom: PhantomData,
}
}
}
impl<T, C> Reactant<T, C> for PlexusExternalInternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn react(
&self,
payload: Arc<Payload<T, C>>,
) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
let current_ganglion_id = self.current_ganglion_id;
let internal_ganglia = self.internal_ganglia.clone();
let reactions = self.reactions.clone();
let payload_clone = payload.clone();
Box::pin(
async move {
tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
let reaction_set_arc = reactions
.get_with(payload_clone.span_id(), async {
Arc::new(Mutex::new(HashSet::new()))
})
.await;
let reaction_set_copy = {
let mut set = reaction_set_arc.lock().await;
if set.contains(¤t_ganglion_id) {
tracing::debug!(
"Ganglion {current_ganglion_id} already processed reaction, returning early"
);
return Ok(());
}
set.insert(current_ganglion_id);
set.clone()
};
let erased_payload = erase_payload(payload_clone.clone());
let internal_ganglia_guard = internal_ganglia.read().await;
let internal_ganglia_to_process: Vec<_> = internal_ganglia_guard
.iter()
.filter(|(ganglion_id, _ganglion)| {
!reaction_set_copy.contains(ganglion_id)
})
.map(|(_ganglion_id, ganglion)| ganglion.clone())
.collect();
drop(internal_ganglia_guard);
let internal_futures = internal_ganglia_to_process.into_iter().map(
|ganglion_mutex| {
let payload = erased_payload.clone();
Box::pin(async move {
let mut ganglion = ganglion_mutex.lock().await;
ganglion.transmit(payload.clone()).await
})
},
);
let _results = join_all(internal_futures).await;
Ok(())
}
.instrument(payload.span_debug("PlexusExternalInternalReactant::react")),
)
}
fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
erase_reactant(self)
}
}
#[allow(clippy::type_complexity)]
pub struct PlexusExternalExternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
current_ganglion_id: Uuid,
external_ganglia:
Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
_phantom: PhantomData<(T, C)>,
}
impl<T, C> Clone for PlexusExternalExternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
current_ganglion_id: self.current_ganglion_id,
external_ganglia: self.external_ganglia.clone(),
reactions: self.reactions.clone(),
_phantom: self._phantom,
}
}
}
impl<T, C> PlexusExternalExternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
#[allow(clippy::type_complexity)]
pub fn new(
current_ganglion_id: Uuid,
external_ganglia: Arc<
RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
>,
reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
) -> Self {
Self {
current_ganglion_id,
external_ganglia,
reactions,
_phantom: PhantomData,
}
}
}
impl<T, C> ReactantRaw<T, C> for PlexusExternalExternalReactant<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn react(
&self,
payload: Arc<PayloadRaw<T, C>>,
) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
let current_ganglion_id = self.current_ganglion_id;
let external_ganglia = self.external_ganglia.clone();
let reactions = self.reactions.clone();
let payload_clone = payload.clone();
Box::pin(
async move {
tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
let reaction_set_arc = reactions
.get_with(payload_clone.span_id(), async {
Arc::new(Mutex::new(HashSet::new()))
})
.await;
let reaction_set_copy = {
let mut set = reaction_set_arc.lock().await;
if set.contains(¤t_ganglion_id) {
tracing::debug!(
"PlexusExternalExternalReactant::react - Ganglion {current_ganglion_id} already processed reaction, returning early"
);
return Ok(());
}
set.insert(current_ganglion_id);
set.clone()
};
let erased_payload = erase_payload_raw(payload_clone.clone());
let external_ganglia_guard = external_ganglia.read().await;
let external_ganglia_to_process: Vec<_> = external_ganglia_guard
.iter()
.filter(|(ganglion_id, _ganglion)| {
*ganglion_id != ¤t_ganglion_id
&& !reaction_set_copy.contains(ganglion_id)
})
.map(|(_ganglion_id, ganglion)| ganglion.clone())
.collect();
drop(external_ganglia_guard);
let external_futures =
external_ganglia_to_process
.into_iter()
.map(|ganglion_mutex| {
let payload = erased_payload.clone();
Box::pin(async move {
let mut ganglion = ganglion_mutex.lock().await;
ganglion.transmit_encoded(payload.clone()).await
})
});
let _results = join_all(external_futures).await;
Ok(())
}
.instrument(payload.span_debug("PlexusExternalExternalReactant::react")),
)
}
fn erase_raw(self: Box<Self>) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
erase_reactant_raw(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::erasure::payload::{erase_payload, erase_payload_raw};
use crate::erasure::reactant::{erase_reactant, erase_reactant_raw};
use crate::logging::TraceContext;
use crate::neuron::NeuronImpl;
use crate::payload::{Payload, PayloadRaw};
use crate::test_utils::{
DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant,
TokioMpscReactantRaw, test_namespace,
};
use tokio::sync::mpsc::channel;
use uuid::Uuid;
#[test]
fn test_cartesian_product_equivalence() {
let neurons = [("neuron1", "Neuron1"), ("neuron2", "Neuron2")];
let ganglia = [("ganglion1", "uuid1"), ("ganglion2", "uuid2")];
let mut old_combinations = HashSet::new();
for (neuron_name, _neuron) in neurons.iter() {
for (ganglion_id, _ganglion_mutex) in ganglia.iter() {
old_combinations.insert((neuron_name.to_string(), ganglion_id.to_string()));
}
}
let new_combinations: HashSet<(String, String)> = neurons
.iter()
.cartesian_product(ganglia.iter())
.map(|((neuron_name, _neuron), (ganglion_id, _ganglion_mutex))| {
(neuron_name.to_string(), ganglion_id.to_string())
})
.collect();
assert_eq!(old_combinations, new_combinations);
assert_eq!(old_combinations.len(), 4);
}
#[tokio::test]
async fn test_plexus_creation() {
let plexus: Plexus = Plexus::new(vec![], vec![]).await;
assert_eq!(plexus.relevant_neurons.len(), 0);
assert_eq!(plexus.ignored_neurons.len(), 0);
}
#[tokio::test]
async fn test_plexus_capable() {
let ns = test_namespace();
let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron_arc = Arc::new(neuron);
assert!(plexus.capable(neuron_arc));
}
#[tokio::test]
async fn test_plexus_adapt() {
let ns = test_namespace();
let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
plexus
.adapt(Arc::new(neuron))
.await
.expect("Failed to adapt neuron");
let neurons = plexus.neurons.read().await;
assert_eq!(neurons.len(), 1);
}
#[tokio::test]
async fn test_plexus_ganglion_internal_adapt() {
let ns = test_namespace();
let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
let neuron: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron_arc = Arc::new(neuron);
assert!(plexus.capable(neuron_arc.clone()));
plexus
.adapt(neuron_arc)
.await
.expect("Failed to adapt neuron");
let neurons = plexus.neurons.read().await;
assert_eq!(neurons.len(), 1);
}
#[tokio::test]
async fn test_plexus_external_inprocess_transmit_via_adapt() {
let ns = test_namespace();
let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let mut ganglion_inprocess = GanglionInprocess::new();
let mut ganglion_external_inprocess = GanglionExternalInprocess::new();
let _ = ganglion_inprocess.adapt(neuron_arc.clone()).await;
ganglion_external_inprocess
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
erase_reactant(Box::new(TokioMpscReactant::new(tx1))),
erase_reactant(Box::new(TokioMpscReactant::new(tx2))),
];
let erased_raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx1))),
erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx2))),
];
ganglion_inprocess
.react(neuron_arc.name(), erased_reactants, vec![])
.await
.expect("Failed to react");
ganglion_external_inprocess
.react(neuron_arc.name(), vec![], erased_raw_reactants, vec![])
.await
.expect("Failed to react raw");
let test_payload = Payload::with_correlation(
DebugStruct {
foo: 42,
bar: "test".to_string(),
},
neuron_arc.clone(),
None,
);
let test_payload_raw = PayloadRaw::with_correlation(
DebugCodec::encode(&DebugStruct {
foo: 42,
bar: "test".to_string(),
})
.expect("Failed to encode test data"),
neuron_arc.clone(),
None,
);
let erased_payload = erase_payload(test_payload);
ganglion_inprocess
.transmit(erased_payload)
.await
.expect("Failed to transmit");
let erased_payload_raw = erase_payload_raw(test_payload_raw);
ganglion_external_inprocess
.transmit_encoded(erased_payload_raw)
.await
.expect("Failed to transmit encoded");
let received1 = rx1.recv().await.expect("Failed to receive payload 1");
let received2 = rx2.recv().await.expect("Failed to receive payload 2");
let raw_received1 = raw_rx1
.recv()
.await
.expect("Failed to receive raw payload 1");
let raw_received2 = raw_rx2
.recv()
.await
.expect("Failed to receive raw payload 2");
assert_eq!(received1.value.foo, 42);
assert_eq!(received2.value.foo, 42);
let decoded1 =
DebugCodec::decode(&raw_received1.value).expect("Failed to decode raw payload 1");
let decoded2 =
DebugCodec::decode(&raw_received2.value).expect("Failed to decode raw payload 2");
assert_eq!(decoded1.foo, 42);
assert_eq!(decoded2.foo, 42);
}
#[tokio::test]
async fn test_plexus_external_inprocess_across_threads() {
let ns = test_namespace();
let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(10);
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron_arc = neuron_impl.clone_to_arc();
let ganglion_inprocess = Arc::new(Mutex::new(GanglionInprocess::new()));
let ganglion_external_inprocess = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
{
let mut ganglion = ganglion_inprocess.lock().await;
let _ = ganglion.adapt(neuron_arc.clone()).await;
}
{
let mut ganglion = ganglion_external_inprocess.lock().await;
ganglion
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
}
let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
erase_reactant(Box::new(TokioMpscReactant::new(tx1))),
erase_reactant(Box::new(TokioMpscReactant::new(tx2))),
];
let erased_raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx1))),
erase_reactant_raw(Box::new(TokioMpscReactantRaw::new(raw_tx2))),
];
{
let mut ganglion = ganglion_inprocess.lock().await;
ganglion
.react(neuron_arc.name(), erased_reactants, vec![])
.await
.expect("Failed to react");
}
{
let mut ganglion = ganglion_external_inprocess.lock().await;
ganglion
.react(neuron_arc.name(), vec![], erased_raw_reactants, vec![])
.await
.expect("Failed to react raw");
}
let ganglion_inprocess_clone = ganglion_inprocess.clone();
let ganglion_external_inprocess_clone = ganglion_external_inprocess.clone();
let neuron_arc_clone = neuron_arc.clone();
let task1 = tokio::task::spawn(async move {
let test_payload = Payload::with_correlation(
DebugStruct {
foo: 42,
bar: "test".to_string(),
},
neuron_arc_clone.clone(),
None,
);
let erased_payload = erase_payload(test_payload);
let mut ganglion = ganglion_inprocess_clone.lock().await;
ganglion
.transmit(erased_payload)
.await
.expect("Failed to transmit");
});
let neuron_arc_clone2 = neuron_arc.clone();
let task2 = tokio::task::spawn(async move {
let test_payload_raw = PayloadRaw::with_correlation(
DebugCodec::encode(&DebugStruct {
foo: 42,
bar: "test".to_string(),
})
.expect("Failed to encode test data"),
neuron_arc_clone2.clone(),
None,
);
let erased_payload_raw = erase_payload_raw(test_payload_raw);
let mut ganglion = ganglion_external_inprocess_clone.lock().await;
ganglion
.transmit_encoded(erased_payload_raw)
.await
.expect("Failed to transmit encoded");
});
task1.await.expect("Task 1 failed");
task2.await.expect("Task 2 failed");
let received1 = rx1.recv().await.expect("Failed to receive payload 1");
let received2 = rx2.recv().await.expect("Failed to receive payload 2");
let raw_received1 = raw_rx1
.recv()
.await
.expect("Failed to receive raw payload 1");
let raw_received2 = raw_rx2
.recv()
.await
.expect("Failed to receive raw payload 2");
assert_eq!(received1.value.foo, 42);
assert_eq!(received2.value.foo, 42);
let decoded1 =
DebugCodec::decode(&raw_received1.value).expect("Failed to decode raw payload 1");
let decoded2 =
DebugCodec::decode(&raw_received2.value).expect("Failed to decode raw payload 2");
assert_eq!(decoded1.foo, 42);
assert_eq!(decoded2.foo, 42);
}
#[tokio::test]
async fn test_plexus_transmit_encoded_external_to_internal() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron_arc = neuron_impl.clone_to_arc();
let (tx_internal, mut rx_internal) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
let external_ganglion = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
let mut internal_plexus = Plexus::new(vec![erase_neuron(neuron_arc.clone())], vec![]).await;
{
let mut g = external_ganglion.lock().await;
g.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron to external ganglion");
}
internal_plexus
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron to internal plexus");
let reactants = vec![erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(
TokioMpscReactant::new(tx_internal),
))];
internal_plexus
.react(neuron_arc.name(), reactants, vec![])
.await
.expect("Failed to add reactant to internal plexus");
let _ = internal_plexus
.infuse_external_ganglion(external_ganglion.clone())
.await;
let test_data = DebugStruct {
foo: 123,
bar: "plexus_transmit_encoded_test".to_string(),
};
let encoded_data = neuron_impl
.encode(&test_data)
.expect("Failed to encode test data");
let correlation_id = Uuid::now_v7();
let span_id = Uuid::now_v7().as_u128() as u64;
let payload_raw = Arc::new(PayloadRaw {
value: Arc::new(encoded_data),
neuron: neuron_arc.clone(),
trace: TraceContext::from_parts(correlation_id, span_id, None),
});
{
let mut g = external_ganglion.lock().await;
let erased_payload_raw = erase_payload_raw(payload_raw.clone());
g.transmit_encoded(erased_payload_raw)
.await
.expect("Failed to transmit encoded payload");
}
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(
rx_internal.len(),
1,
"Internal plexus should have received exactly one payload"
);
let received_payload = rx_internal
.recv()
.await
.expect("Should receive decoded payload in internal plexus");
assert_eq!(
received_payload.value.foo, test_data.foo,
"Decoded payload should have correct foo value"
);
assert_eq!(
received_payload.value.bar, test_data.bar,
"Decoded payload should have correct bar value"
);
assert_eq!(
received_payload.correlation_id(), correlation_id,
"Correlation ID should match"
);
}
#[tokio::test]
async fn test_plexus_ganglion_excision() {
let mut plexus: Plexus = Plexus::new(vec![], vec![]).await;
let internal_ganglion_1 = Arc::new(Mutex::new(GanglionInprocess::new()));
let internal_id_1 = { internal_ganglion_1.lock().await.unique_id() };
plexus
.infuse_ganglion(internal_ganglion_1.clone())
.await
.expect("Failed to infuse internal ganglion 1");
{
let internal_ganglia = plexus.internal_ganglia.read().await;
assert!(internal_ganglia.contains_key(&internal_id_1));
}
let excised_internal_1 = plexus
.excise_ganglion_by_id(internal_id_1)
.await
.expect("Failed to excise internal ganglion 1 by ID");
assert!(excised_internal_1.is_some());
assert_eq!(
excised_internal_1.unwrap().lock().await.unique_id(),
internal_id_1
);
{
let internal_ganglia = plexus.internal_ganglia.read().await;
assert!(!internal_ganglia.contains_key(&internal_id_1));
}
let internal_ganglion_2 = Arc::new(Mutex::new(GanglionInprocess::new()));
let internal_id_2 = { internal_ganglion_2.lock().await.unique_id() };
plexus
.infuse_ganglion(internal_ganglion_2.clone())
.await
.expect("Failed to infuse internal ganglion 2");
let excised_internal_2 = plexus
.excise_ganglion(internal_ganglion_2.clone())
.await
.expect("Failed to excise internal ganglion 2 by instance");
assert!(excised_internal_2.is_some());
assert_eq!(
excised_internal_2.unwrap().lock().await.unique_id(),
internal_id_2
);
{
let internal_ganglia = plexus.internal_ganglia.read().await;
assert!(!internal_ganglia.contains_key(&internal_id_2));
}
let external_ganglion_1 = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
let external_id_1 = { external_ganglion_1.lock().await.unique_id() };
plexus
.infuse_external_ganglion(external_ganglion_1.clone())
.await
.expect("Failed to infuse external ganglion 1");
{
let external_ganglia = plexus.external_ganglia.read().await;
assert!(external_ganglia.contains_key(&external_id_1));
}
let excised_external_1 = plexus
.excise_external_ganglion_by_id(external_id_1)
.await
.expect("Failed to excise external ganglion 1 by ID");
assert!(excised_external_1.is_some());
assert_eq!(
excised_external_1.unwrap().lock().await.unique_id(),
external_id_1
);
{
let external_ganglia = plexus.external_ganglia.read().await;
assert!(!external_ganglia.contains_key(&external_id_1));
}
let external_ganglion_2 = Arc::new(Mutex::new(GanglionExternalInprocess::new()));
let external_id_2 = { external_ganglion_2.lock().await.unique_id() };
plexus
.infuse_external_ganglion(external_ganglion_2.clone())
.await
.expect("Failed to infuse external ganglion 2");
let excised_external_2 = plexus
.excise_external_ganglion(external_ganglion_2.clone())
.await
.expect("Failed to excise external ganglion 2 by instance");
assert!(excised_external_2.is_some());
assert_eq!(
excised_external_2.unwrap().lock().await.unique_id(),
external_id_2
);
{
let external_ganglia = plexus.external_ganglia.read().await;
assert!(!external_ganglia.contains_key(&external_id_2));
}
}
}