use crate::codec::{Codec, CodecName};
use crate::erasure::payload::{PayloadErased, PayloadRawErased};
use crate::erasure::reactant::{ErrorReactantErased, ReactantErased, ReactantRawErased};
use crate::erasure::synapse::{SynapseInternalErased, erase_synapse_internal};
use crate::logging::LogTrace;
use crate::neuron::{Neuron, NeuronError};
use crate::synapse::SynapseInprocess;
use crate::utils::struct_name_of_type;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use parking_lot::RwLock;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::Instrument;
use uuid::Uuid;
#[derive(Error, Debug)]
pub enum GanglionError {
#[error("Synapse not found: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
SynapseNotFound {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
},
#[error(
"Failed to acquire lock on synapse: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
)]
SynapseLock {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
},
#[error(
"Transmission error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id}): {message}"
)]
Transmit {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
message: String,
},
#[error("Encode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
Encode {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
},
#[error("Decode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
Decode {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
},
#[error(
"Adaptation error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
)]
Adapt {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
},
#[error("Queue full for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
QueueFull {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
},
}
impl GanglionError {
pub fn from_neuron_error(
neuron_error: NeuronError,
ganglion_name: String,
ganglion_id: Uuid,
) -> Self {
match neuron_error {
NeuronError::Encode { neuron_name, .. } => GanglionError::Encode {
neuron_name,
ganglion_name,
ganglion_id,
},
NeuronError::Decode { neuron_name, .. } => GanglionError::Decode {
neuron_name,
ganglion_name,
ganglion_id,
},
}
}
pub fn from_plexus_error(
plexus_error: crate::plexus::PlexusError,
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
) -> Self {
match plexus_error {
crate::plexus::PlexusError::Ganglion(ganglion_error) => ganglion_error,
_ => GanglionError::Adapt {
neuron_name,
ganglion_name,
ganglion_id,
},
}
}
}
pub trait Ganglion {
fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static;
fn adapt<T, C>(
&mut self,
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static;
}
pub trait GanglionInternal {
fn transmit(
&mut self,
payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>>;
fn react(
&mut self,
neuron_name: String,
reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>;
fn react_many(
&mut self,
reactions: HashMap<
String,
(
Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
),
>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
let mut futures = Vec::new();
for (name, (rs, ers)) in reactions {
futures.push(self.react(name, rs, ers));
}
Box::pin(async move {
for f in futures {
f.await?;
}
Ok(())
})
}
fn unique_id(&self) -> Uuid;
}
pub trait GanglionExternal {
fn transmit(
&mut self,
payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>>;
#[allow(clippy::type_complexity)]
fn transmit_encoded(
&mut self,
payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), GanglionError>> + Send + 'static>>;
fn react(
&mut self,
neuron_name: String,
reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>;
fn react_many(
&mut self,
reactions: HashMap<
String,
(
Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
),
>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
let mut futures = Vec::new();
for (name, (rs, rrs, ers)) in reactions {
futures.push(self.react(name, rs, rrs, ers));
}
Box::pin(async move {
for f in futures {
f.await?;
}
Ok(())
})
}
fn unique_id(&self) -> Uuid;
}
pub async fn adapt_all<T, C, G>(
ganglia: &[Arc<Mutex<G>>],
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
) -> Result<(), GanglionError>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
G: Ganglion + ?Sized,
{
for ganglion_mutex in ganglia {
let mut ganglion = ganglion_mutex.lock().await;
ganglion.adapt(neuron.clone()).await?;
}
Ok(())
}
#[macro_export]
macro_rules! adapt_all {
($neuron:expr, $($ganglion:expr),+ $(,)?) => {
async {
$(
$ganglion.lock().await.adapt($neuron.clone()).await?;
)+
Result::<(), $crate::ganglion::GanglionError>::Ok(())
}
};
}
#[macro_export]
macro_rules! adapt_many {
([$($neuron:expr),+ $(,)?], $ganglion:expr $(,)?) => {
async {
let mut g = $ganglion.lock().await;
$(
g.adapt($neuron.clone()).await?;
)+
Result::<(), $crate::ganglion::GanglionError>::Ok(())
}
};
([$($neuron:expr),+ $(,)?], $head_ganglion:expr, $($tail_ganglion:expr),+ $(,)?) => {
async {
{
let mut g = $head_ganglion.lock().await;
$(
g.adapt($neuron.clone()).await?;
)+
}
$crate::adapt_many!([$($neuron),+], $($tail_ganglion),+).await
}
};
}
pub struct GanglionInprocess {
id: Uuid,
synapses_by_name:
HashMap<String, Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>>,
relevant_neurons: HashSet<String>,
ignored_neurons: HashSet<String>,
}
impl GanglionInprocess {
pub fn new() -> Self {
Self {
id: Uuid::now_v7(),
synapses_by_name: HashMap::new(),
relevant_neurons: HashSet::new(),
ignored_neurons: HashSet::new(),
}
}
pub fn new_shared() -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self::new()))
}
pub fn new_with_filters(
relevant_neurons: HashSet<String>,
ignored_neurons: HashSet<String>,
) -> Self {
Self {
id: Uuid::now_v7(),
synapses_by_name: HashMap::new(),
relevant_neurons,
ignored_neurons,
}
}
fn get_synapse_by_name(
&self,
name: &str,
) -> Option<Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>> {
self.synapses_by_name.get(name).cloned()
}
}
impl Default for GanglionInprocess {
fn default() -> Self {
Self::new()
}
}
impl Ganglion for GanglionInprocess {
fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let neuron_name = neuron.name();
if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
return false;
}
if !self.ignored_neurons.is_empty() && self.ignored_neurons.contains(&neuron_name) {
return false;
}
true
}
fn adapt<T, C>(
&mut self,
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
if !self.capable(neuron.clone()) {
return Box::pin(async move {
Ok(()) });
}
let neuron_name = neuron.name();
if self.synapses_by_name.contains_key(&neuron_name) {
return Box::pin(async move {
Ok(()) });
}
let synapse = SynapseInprocess::<T, C>::new(neuron.clone(), vec![], vec![]);
let erased_synapse = erase_synapse_internal(synapse);
self.synapses_by_name
.insert(neuron_name.clone(), erased_synapse);
Box::pin(async move { Ok(()) })
}
}
impl GanglionInternal for GanglionInprocess {
fn transmit(
&mut self,
payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
let neuron_name = payload.get_neuron_name();
tracing::debug!("GanglionInprocess::transmit called for neuron: {neuron_name}");
if let Some(synapse_lock) = self.get_synapse_by_name(&neuron_name) {
tracing::debug!("GanglionInprocess::transmit found synapse, acquiring read lock");
let synapse_guard = synapse_lock.read();
tracing::debug!(
"GanglionInprocess::transmit acquired read lock, calling transmit_erased"
);
let future = synapse_guard.transmit_erased(payload.clone());
let ganglion_id = self.id;
let ganglion_name = struct_name_of_type::<Self>().to_string();
Box::pin(
async move {
tracing::debug!(
"GanglionInprocess::transmit awaiting transmit_erased future"
);
let result = future.await;
tracing::debug!("GanglionInprocess::transmit transmit_erased completed");
result.map_err(|e| match e {
crate::synapse::SynapseError::QueueFull { neuron_name: _ } => {
GanglionError::QueueFull {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
}
}
_ => GanglionError::Transmit {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
message: e.to_string(),
},
})
}
.instrument(payload.span_debug("GanglionInprocess::transmit")),
)
} else {
tracing::debug!("GanglionInprocess::transmit synapse not found");
let ganglion_id = self.id;
let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
Box::pin(async move {
Err(GanglionError::SynapseNotFound {
neuron_name,
ganglion_name,
ganglion_id,
})
})
}
}
fn react(
&mut self,
neuron_name: String,
reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
let synapse_lock_opt = self.get_synapse_by_name(&neuron_name);
if synapse_lock_opt.is_none() {
let ganglion_id = self.id;
let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
return Box::pin(async move {
Err(GanglionError::SynapseNotFound {
neuron_name,
ganglion_name,
ganglion_id,
})
});
}
let synapse_lock = synapse_lock_opt.unwrap();
let mut synapse_guard = synapse_lock.write();
synapse_guard.react_erased(reactants, error_reactants);
Box::pin(async move { Ok(()) })
}
fn react_many(
&mut self,
reactions: HashMap<
String,
(
Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
),
>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
for (neuron_name, (reactants, error_reactants)) in reactions {
if let Some(synapse_lock) = self.get_synapse_by_name(&neuron_name) {
let mut synapse_guard = synapse_lock.write();
synapse_guard.react_erased(reactants, error_reactants);
} else {
let ganglion_id = self.id;
let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
return Box::pin(async move {
Err(GanglionError::SynapseNotFound {
neuron_name,
ganglion_name,
ganglion_id,
})
});
}
}
Box::pin(async move { Ok(()) })
}
fn unique_id(&self) -> Uuid {
self.id
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::erasure::payload::{erase_payload, erase_payload_raw};
use crate::erasure::reactant::{
ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
};
use crate::namespace::NamespaceImpl;
use crate::neuron::{Neuron, NeuronImpl};
use crate::payload::{Payload, PayloadRaw};
use crate::test_utils::{
DebugCodec, DebugStruct, GanglionExternalInprocess, PingCodec, PingMsg, PingNeuron,
TokioMpscReactant, TokioMpscReactantGeneric, TokioMpscReactantRaw, test_namespace,
};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::channel;
use tokio::task;
use tokio::time::sleep;
use uuid::Uuid;
#[test]
fn test_ganglion_error_with_ganglion_name() {
let ganglion_id = Uuid::now_v7();
let ganglion_name = struct_name_of_type::<GanglionInprocess>().to_string();
let synapse_not_found = GanglionError::SynapseNotFound {
neuron_name: "test_neuron".to_string(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
};
assert!(synapse_not_found.to_string().contains("test_neuron"));
assert!(synapse_not_found.to_string().contains("GanglionInprocess"));
let synapse_lock_error = GanglionError::SynapseLock {
neuron_name: "test_neuron".to_string(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
};
assert!(synapse_lock_error.to_string().contains("test_neuron"));
assert!(synapse_lock_error.to_string().contains("GanglionInprocess"));
let transmit_error = GanglionError::Transmit {
neuron_name: "test_neuron".to_string(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
message: "test failure".to_string(),
};
assert!(transmit_error.to_string().contains("test_neuron"));
assert!(transmit_error.to_string().contains("GanglionInprocess"));
assert!(transmit_error.to_string().contains("test failure"));
}
#[tokio::test]
async fn test_ganglion_inprocess_get_synapse_by_name() {
let ns = test_namespace();
let neuron_impl_instance: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron_name_str = neuron_impl_instance.name();
let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
Arc::new(neuron_impl_instance);
let mut ganglion = GanglionInprocess::new();
ganglion
.adapt(neuron.clone())
.await
.expect("Failed to adapt neuron for test");
let result_by_name1 = ganglion.get_synapse_by_name(&neuron_name_str);
assert!(result_by_name1.is_some());
let synapse_by_name1 = result_by_name1.unwrap();
let result_by_name2 = ganglion.get_synapse_by_name(&neuron_name_str);
assert!(result_by_name2.is_some());
let synapse_by_name2 = result_by_name2.unwrap();
assert!(
Arc::ptr_eq(&synapse_by_name1, &synapse_by_name2),
"Repeated calls to get_synapse_by_name should yield the same Arc instance."
);
let non_existent_result = ganglion.get_synapse_by_name("non_existent_neuron");
assert!(non_existent_result.is_none());
}
#[tokio::test]
async fn test_ganglion_inprocess_transmit_via_adapt() {
let ns = test_namespace();
let (tx1, mut rx1) = channel::<Arc<Payload<PingMsg, PingCodec>>>(10);
let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron_arc = neuron_impl.clone_to_arc();
let mut ganglion: GanglionInprocess = GanglionInprocess::new();
ganglion
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
let ping_neuron = Arc::new(PingNeuron::new(test_namespace()));
ganglion
.adapt(ping_neuron.clone())
.await
.expect("Failed to adapt ping neuron");
let reactants1: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> =
vec![erase_reactant::<PingMsg, PingCodec, _>(Box::new(
TokioMpscReactantGeneric::new(tx1.clone()),
))];
let reactants2: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> =
vec![erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(
TokioMpscReactantGeneric::new(tx2.clone()),
))];
ganglion
.react(ping_neuron.name(), reactants1, vec![])
.await
.expect("Failed to react ping");
ganglion
.react(neuron_arc.name(), reactants2, vec![])
.await
.expect("Failed to react debug");
let correlation_uuid1 = Uuid::now_v7();
let payload1 = Payload::builder()
.value(PingMsg { seq: 1 })
.correlation_id(correlation_uuid1)
.neuron(ping_neuron)
.build()
.unwrap();
let erased_payload1 = erase_payload(payload1);
ganglion
.transmit(erased_payload1)
.await
.expect("Failed to transmit payload1");
assert_eq!(
rx1.len(),
1,
"Reactant 1 should have received the first message (ping)"
);
let received_p1_ch1 = rx1.recv().await.unwrap();
assert_eq!(
received_p1_ch1.value.seq, 1,
"Payload value mismatch for reactant 1"
);
assert_eq!(
received_p1_ch1.correlation_id(), correlation_uuid1,
"Correlation ID mismatch for reactant 1"
);
let debug_struct_arc_2 = Arc::new(DebugStruct {
foo: 456,
bar: "ganglion_test_payload_2".to_string(),
});
let correlation_uuid2 = Uuid::now_v7();
let payload2 = Payload::builder()
.value((*debug_struct_arc_2).clone())
.correlation_id(correlation_uuid2)
.neuron(neuron_arc.clone())
.build()
.unwrap();
let erased_payload2 = erase_payload(payload2);
ganglion
.transmit(erased_payload2)
.await
.expect("Failed to transmit payload2");
assert_eq!(
rx2.len(),
1,
"Reactant 2 should have received the second message (debug)"
);
let received_p2_ch1 = rx2.recv().await.unwrap();
assert_eq!(
received_p2_ch1.value, debug_struct_arc_2,
"Second payload value mismatch for reactant 2"
);
assert_eq!(
received_p2_ch1.correlation_id(), correlation_uuid2,
"Second correlation ID mismatch for reactant 2"
);
assert_eq!(
rx1.len(),
0,
"Reactant 1 channel should be empty after all expected messages"
);
assert_eq!(
rx2.len(),
0,
"Reactant 2 channel should be empty after all expected messages"
);
}
#[tokio::test]
async fn test_ganglion_inprocess_across_threads() {
struct SharedState {
tx1: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
tx2: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
received_count: std::sync::atomic::AtomicUsize,
}
let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
let shared_state = Arc::new(SharedState {
tx1,
tx2,
received_count: std::sync::atomic::AtomicUsize::new(0),
});
let num_threads = 10;
let payloads_per_thread = 10;
let total_payloads = num_threads * payloads_per_thread;
let mut handles = Vec::new();
let receiver_state = shared_state.clone();
let receiver_handle = task::spawn(async move {
let mut received_payloads = Vec::new();
for _ in 0..total_payloads * 2 {
tokio::select! {
Some(payload) = rx1.recv() => {
received_payloads.push(payload);
receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Some(payload) = rx2.recv() => {
received_payloads.push(payload);
receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
}
received_payloads
});
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron = neuron_impl.clone_to_arc();
let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
sender: shared_state.tx1.clone(),
})),
erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
sender: shared_state.tx2.clone(),
})),
];
let mut shared_ganglion = GanglionInprocess::new();
shared_ganglion
.adapt(neuron.clone())
.await
.expect("Failed to adapt neuron");
shared_ganglion
.react(neuron.name(), reactants, vec![])
.await
.expect("Failed to react");
let shared_ganglion = Arc::new(tokio::sync::Mutex::new(shared_ganglion));
for thread_id in 0..num_threads {
let ganglion = shared_ganglion.clone();
let neuron_clone = neuron.clone();
let handle = task::spawn(async move {
for i in 0..payloads_per_thread {
let payload_id = thread_id * payloads_per_thread + i;
let debug_struct = Arc::new(DebugStruct {
foo: payload_id as i32,
bar: format!("thread_{thread_id}_payload_{i}"),
});
let correlation_uuid = Uuid::now_v7();
let payload = Payload::builder()
.value((*debug_struct).clone())
.correlation_id(correlation_uuid)
.neuron(neuron_clone.clone())
.build()
.unwrap();
sleep(Duration::from_millis(1)).await;
let mut ganglion_guard = ganglion.lock().await;
let erased_payload = erase_payload(payload);
let _ = ganglion_guard.transmit(erased_payload).await;
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let received_payloads = receiver_handle.await.unwrap();
assert_eq!(
shared_state
.received_count
.load(std::sync::atomic::Ordering::SeqCst),
total_payloads * 2,
"Should have received all payloads on both reactants"
);
let mut foo_values = received_payloads
.iter()
.map(|p| p.value.foo)
.collect::<Vec<_>>();
foo_values.sort();
foo_values.dedup();
assert_eq!(
foo_values.len(),
total_payloads,
"Should have received payloads with all expected foo values"
);
for i in 0..total_payloads {
assert!(
foo_values.contains(&(i as i32)),
"Should have received a payload with foo={i}"
);
}
let mut correlation_ids = received_payloads
.iter()
.map(|p| p.correlation_id())
.collect::<Vec<_>>();
correlation_ids.sort();
let mut correlation_id_counts = std::collections::HashMap::new();
for id in &correlation_ids {
*correlation_id_counts.entry(*id).or_insert(0) += 1;
}
assert_eq!(
correlation_id_counts.len(),
total_payloads,
"Should have received payloads with all expected correlation_ids"
);
for (id, count) in correlation_id_counts {
assert_eq!(
count, 2,
"Correlation ID {id} should appear exactly twice (once from each reactant)"
);
}
}
#[tokio::test]
async fn test_ganglion_external_unique_id() {
use crate::test_utils::GanglionExternalInprocess;
let ganglion1 = GanglionExternalInprocess::new();
let ganglion2 = GanglionExternalInprocess::new();
assert_ne!(ganglion1.unique_id(), ganglion2.unique_id());
assert_eq!(ganglion1.unique_id(), ganglion1.unique_id());
}
#[tokio::test]
async fn test_ganglion_inprocess_capable_with_relevant_neurons() {
let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
NamespaceImpl {
delimiter: ".",
parts: vec!["dev", "plexo", "1"],
},
)));
let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
NamespaceImpl {
delimiter: ".",
parts: vec!["dev", "plexo", "2"],
},
)));
let mut relevant_neurons = HashSet::new();
relevant_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
let mut ganglion = GanglionInprocess::new_with_filters(relevant_neurons, HashSet::new());
assert!(ganglion.capable(neuron1.clone()));
assert!(!ganglion.capable(neuron2.clone()));
}
#[tokio::test]
async fn test_ganglion_inprocess_capable_with_ignored_neurons() {
let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
NamespaceImpl {
delimiter: ".",
parts: vec!["dev", "plexo", "1"],
},
)));
let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
NamespaceImpl {
delimiter: ".",
parts: vec!["dev", "plexo", "2"],
},
)));
let mut ignored_neurons = HashSet::new();
ignored_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
let mut ganglion = GanglionInprocess::new_with_filters(HashSet::new(), ignored_neurons);
assert!(!ganglion.capable(neuron1.clone()));
assert!(ganglion.capable(neuron2.clone()));
}
#[tokio::test]
async fn test_ganglion_external_inprocess_capable_with_relevant_neurons() {
let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
NamespaceImpl {
delimiter: ".",
parts: vec!["dev", "plexo", "1"],
},
)));
let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
NamespaceImpl {
delimiter: ".",
parts: vec!["dev", "plexo", "2"],
},
)));
let mut relevant_neurons = HashSet::new();
relevant_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
let mut ganglion =
GanglionExternalInprocess::new_with_filters(relevant_neurons, HashSet::new());
assert!(ganglion.capable(neuron1.clone()));
assert!(!ganglion.capable(neuron2.clone()));
}
#[tokio::test]
async fn test_ganglion_external_inprocess_capable_with_ignored_neurons() {
let neuron1 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
NamespaceImpl {
delimiter: ".",
parts: vec!["dev", "plexo", "1"],
},
)));
let neuron2 = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
NamespaceImpl {
delimiter: ".",
parts: vec!["dev", "plexo", "2"],
},
)));
let mut ignored_neurons = HashSet::new();
ignored_neurons.insert("dev.plexo.1.DebugStruct.debug".to_string());
let mut ganglion =
GanglionExternalInprocess::new_with_filters(HashSet::new(), ignored_neurons);
assert!(!ganglion.capable(neuron1.clone()));
assert!(ganglion.capable(neuron2.clone()));
}
#[tokio::test]
async fn test_ganglion_inprocess_capable_default_behavior() {
let neuron = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(Arc::new(
NamespaceImpl {
delimiter: ".",
parts: vec!["dev", "plexo"],
},
)));
let mut ganglion = GanglionInprocess::new();
assert!(ganglion.capable(neuron.clone()));
}
#[tokio::test]
async fn test_ganglion_external_inprocess_capable_default_behavior() {
let neuron = Arc::new(NeuronImpl::<DebugStruct, DebugCodec>::new(test_namespace()));
let mut ganglion = GanglionExternalInprocess::new();
assert!(ganglion.capable(neuron.clone()));
}
#[tokio::test]
async fn test_ganglion_external_transmit_encoded() {
let ns = test_namespace();
let mut ganglion = GanglionExternalInprocess::new();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
Arc::new(neuron_impl.clone());
ganglion
.adapt(neuron.clone())
.await
.expect("Failed to adapt neuron");
let debug_struct_value = DebugStruct {
foo: 42,
bar: "test_value".to_owned(),
};
let debug_struct_arc = Arc::new(debug_struct_value);
let correlation_id = Uuid::now_v7();
let encoded = neuron_impl
.encode(debug_struct_arc.as_ref())
.expect("Encoding should succeed in test");
let payload_raw =
PayloadRaw::with_correlation(encoded, neuron.clone(), Some(correlation_id));
let erased_payload = erase_payload_raw(payload_raw);
let result = ganglion
.transmit_encoded(erased_payload)
.await
.expect("Failed to transmit encoded payload");
assert_eq!(result.0.len(), 0);
assert_eq!(result.1.len(), 0);
}
#[tokio::test]
async fn test_ganglion_external_adapt() {
let mut ganglion = GanglionExternalInprocess::new();
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
Arc::new(neuron_impl);
ganglion
.adapt(neuron)
.await
.expect("Failed to adapt neuron");
}
#[tokio::test]
async fn test_ganglion_external_inprocess_transmit_via_adapt() {
let ns = test_namespace();
let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron_arc = neuron_impl.clone_to_arc();
let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
sender: tx1.clone(),
})),
erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
sender: tx2.clone(),
})),
];
let raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
sender: raw_tx1.clone(),
})),
erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
sender: raw_tx2.clone(),
})),
];
let mut ganglion: GanglionExternalInprocess = GanglionExternalInprocess::new();
ganglion
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
ganglion
.react(neuron_arc.name(), reactants, raw_reactants, vec![])
.await
.expect("Failed to react");
let debug_struct_arc = Arc::new(DebugStruct {
foo: 123,
bar: "ganglion_external_test_payload_1".to_string(),
});
let correlation_uuid1 = Uuid::now_v7();
let encoded = neuron_impl
.encode(debug_struct_arc.as_ref())
.expect("Encoding should succeed in test");
let payload_raw1 = PayloadRaw::with_correlation(
encoded,
neuron_arc.clone(),
Some(correlation_uuid1),
);
let erased_payload1 = erase_payload_raw(payload_raw1);
ganglion
.transmit_encoded(erased_payload1)
.await
.expect("Failed to transmit encoded payload1");
let received_raw_p1_ch1 =
tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx1.recv())
.await
.expect("Timeout raw_rx1")
.expect("Closed raw_rx1");
assert_eq!(
received_raw_p1_ch1.correlation_id(), correlation_uuid1,
"Raw correlation ID mismatch for reactant 1"
);
let received_raw_p1_ch2 =
tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx2.recv())
.await
.expect("Timeout raw_rx2")
.expect("Closed raw_rx2");
assert_eq!(
received_raw_p1_ch2.correlation_id(), correlation_uuid1,
"Raw correlation ID mismatch for reactant 2"
);
let received_p1_ch1 =
tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
.await
.expect("Timeout rx1")
.expect("Closed rx1");
assert_eq!(
received_p1_ch1.value, debug_struct_arc,
"Payload value mismatch for reactant 1"
);
assert_eq!(
received_p1_ch1.correlation_id(), correlation_uuid1,
"Correlation ID mismatch for reactant 1"
);
let received_p1_ch2 =
tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
.await
.expect("Timeout rx2")
.expect("Closed rx2");
assert_eq!(
received_p1_ch2.value, debug_struct_arc,
"Payload value mismatch for reactant 2"
);
assert_eq!(
received_p1_ch2.correlation_id(), correlation_uuid1,
"Correlation ID mismatch for reactant 2"
);
let debug_struct_arc_2 = Arc::new(DebugStruct {
foo: 456,
bar: "ganglion_external_test_payload_2".to_string(),
});
let correlation_uuid2 = Uuid::now_v7();
let encoded2 = neuron_impl
.encode(debug_struct_arc_2.as_ref())
.expect("Encoding should succeed in test");
let payload_raw2 = PayloadRaw::with_correlation(
encoded2,
neuron_arc.clone(),
Some(correlation_uuid2),
);
let erased_payload2 = erase_payload_raw(payload_raw2);
ganglion
.transmit_encoded(erased_payload2)
.await
.expect("Failed to transmit encoded payload2");
let received_raw_p2_ch1 =
tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx1.recv())
.await
.expect("Timeout raw_rx1_2")
.expect("Closed raw_rx1_2");
assert_eq!(
received_raw_p2_ch1.correlation_id(), correlation_uuid2,
"Second raw correlation ID mismatch for reactant 1"
);
let received_raw_p2_ch2 =
tokio::time::timeout(std::time::Duration::from_millis(100), raw_rx2.recv())
.await
.expect("Timeout raw_rx2_2")
.expect("Closed raw_rx2_2");
assert_eq!(
received_raw_p2_ch2.correlation_id(), correlation_uuid2,
"Second raw correlation ID mismatch for reactant 2"
);
let received_p2_ch1 =
tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
.await
.expect("Timeout rx1_2")
.expect("Closed rx1_2");
assert_eq!(
received_p2_ch1.value, debug_struct_arc_2,
"Second payload value mismatch for reactant 1"
);
assert_eq!(
received_p2_ch1.correlation_id(), correlation_uuid2,
"Second correlation ID mismatch for reactant 1"
);
let received_p2_ch2 =
tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
.await
.expect("Timeout rx2_2")
.expect("Closed rx2_2");
assert_eq!(
received_p2_ch2.value, debug_struct_arc_2,
"Second payload value mismatch for reactant 2"
);
assert_eq!(
received_p2_ch2.correlation_id(), correlation_uuid2,
"Second correlation ID mismatch for reactant 2"
);
assert_eq!(
rx1.len(),
0,
"Reactant 1 channel should be empty after all expected messages"
);
assert_eq!(
rx2.len(),
0,
"Reactant 2 channel should be empty after all expected messages"
);
assert_eq!(
raw_rx1.len(),
0,
"Raw reactant 1 channel should be empty after all expected messages"
);
assert_eq!(
raw_rx2.len(),
0,
"Raw reactant 2 channel should be empty after all expected messages"
);
}
#[tokio::test]
async fn test_ganglion_inprocess_adapt_erased() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + 'static> =
Arc::new(neuron_impl);
let mut ganglion = GanglionInprocess::new();
ganglion
.adapt(neuron.clone())
.await
.expect("Failed to adapt neuron");
let neuron_name = neuron.name();
let synapse = ganglion.get_synapse_by_name(&neuron_name);
assert!(synapse.is_some());
}
#[tokio::test]
async fn test_ganglion_external_inprocess_across_threads() {
use crate::ganglion::GanglionExternal;
use crate::neuron::NeuronImpl;
use crate::payload::PayloadRaw;
use crate::test_utils::{
DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactant,
TokioMpscReactantRaw, test_namespace,
};
use std::sync::Arc;
use tokio::sync::mpsc::channel;
use tokio::task;
use tokio::time::{Duration, sleep};
use uuid::Uuid;
struct SharedState {
tx1: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
tx2: tokio::sync::mpsc::Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
raw_tx1: tokio::sync::mpsc::Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
raw_tx2: tokio::sync::mpsc::Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
received_count: std::sync::atomic::AtomicUsize,
raw_received_count: std::sync::atomic::AtomicUsize,
}
let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(100);
let (raw_tx1, mut raw_rx1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(100);
let (raw_tx2, mut raw_rx2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(100);
let shared_state = Arc::new(SharedState {
tx1,
tx2,
raw_tx1,
raw_tx2,
received_count: std::sync::atomic::AtomicUsize::new(0),
raw_received_count: std::sync::atomic::AtomicUsize::new(0),
});
let num_threads = 10;
let payloads_per_thread = 10;
let total_payloads = num_threads * payloads_per_thread;
let mut handles = Vec::new();
let receiver_state = shared_state.clone();
let receiver_handle = task::spawn(async move {
let mut received_payloads = Vec::new();
let mut received_raw_payloads = Vec::new();
for _ in 0..total_payloads * 4 {
tokio::select! {
Some(payload) = rx1.recv() => {
received_payloads.push(payload);
receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Some(payload) = rx2.recv() => {
received_payloads.push(payload);
receiver_state.received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Some(payload) = raw_rx1.recv() => {
received_raw_payloads.push(payload);
receiver_state.raw_received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Some(payload) = raw_rx2.recv() => {
received_raw_payloads.push(payload);
receiver_state.raw_received_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
}
(received_payloads, received_raw_payloads)
});
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron = neuron_impl.clone_to_arc();
let reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = vec![
erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
sender: shared_state.tx1.clone(),
})),
erase_reactant::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactant {
sender: shared_state.tx2.clone(),
})),
];
let raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>> = vec![
erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
sender: shared_state.raw_tx1.clone(),
})),
erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(TokioMpscReactantRaw {
sender: shared_state.raw_tx2.clone(),
})),
];
let mut shared_ganglion = GanglionExternalInprocess::new();
shared_ganglion
.adapt(neuron.clone())
.await
.expect("Failed to adapt neuron");
shared_ganglion
.react(neuron.name(), reactants, raw_reactants, vec![])
.await
.expect("Failed to react");
let shared_ganglion = Arc::new(tokio::sync::Mutex::new(shared_ganglion));
for thread_id in 0..num_threads {
let ganglion = shared_ganglion.clone();
let neuron_clone = neuron.clone();
let neuron_impl_clone = neuron_impl.clone();
let handle = task::spawn(async move {
for i in 0..payloads_per_thread {
let payload_id = thread_id * payloads_per_thread + i;
let debug_struct = Arc::new(DebugStruct {
foo: payload_id as i32,
bar: format!("external_thread_{thread_id}_payload_{i}"),
});
let correlation_uuid = Uuid::now_v7();
let encoded = neuron_impl_clone
.encode(debug_struct.as_ref())
.expect("Encoding should succeed in test");
let payload_raw = PayloadRaw::with_correlation(
encoded,
neuron_clone.clone(),
Some(correlation_uuid),
);
sleep(Duration::from_millis(1)).await;
let mut ganglion_guard = ganglion.lock().await;
let erased_payload = erase_payload_raw(payload_raw);
ganglion_guard
.transmit_encoded(erased_payload)
.await
.expect("Failed to transmit encoded payload");
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let (received_payloads, received_raw_payloads) = receiver_handle.await.unwrap();
assert_eq!(
shared_state
.received_count
.load(std::sync::atomic::Ordering::SeqCst),
total_payloads * 2,
"Should have received all decoded payloads on both regular reactants"
);
assert_eq!(
shared_state
.raw_received_count
.load(std::sync::atomic::Ordering::SeqCst),
total_payloads * 2,
"Should have received all raw payloads on both raw reactants"
);
let mut foo_values = received_payloads
.iter()
.map(|p| p.value.foo)
.collect::<Vec<_>>();
foo_values.sort();
foo_values.dedup();
assert_eq!(
foo_values.len(),
total_payloads,
"Should have received payloads with all expected foo values"
);
for i in 0..total_payloads {
assert!(
foo_values.contains(&(i as i32)),
"Should have received a payload with foo={i}"
);
}
let mut correlation_ids = received_payloads
.iter()
.map(|p| p.correlation_id())
.collect::<Vec<_>>();
correlation_ids.sort();
let mut correlation_id_counts = std::collections::HashMap::new();
for id in &correlation_ids {
*correlation_id_counts.entry(*id).or_insert(0) += 1;
}
assert_eq!(
correlation_id_counts.len(),
total_payloads,
"Should have received payloads with all expected correlation_ids"
);
for (id, count) in correlation_id_counts {
assert_eq!(
count, 2,
"Correlation ID {id} should appear exactly twice (once from each regular reactant)"
);
}
let mut raw_correlation_ids = received_raw_payloads
.iter()
.map(|p| p.correlation_id())
.collect::<Vec<_>>();
raw_correlation_ids.sort();
let mut raw_correlation_id_counts = std::collections::HashMap::new();
for id in &raw_correlation_ids {
*raw_correlation_id_counts.entry(*id).or_insert(0) += 1;
}
assert_eq!(
raw_correlation_id_counts.len(),
total_payloads,
"Should have received raw payloads with all expected correlation_ids"
);
for (id, count) in raw_correlation_id_counts {
assert_eq!(
count, 2,
"Correlation ID {id} should appear exactly twice (once from each raw reactant)"
);
}
}
}