use plexor_core::erasure::payload::erase_payload;
use plexor_core::erasure::reactant::erase_reactant;
use plexor_core::ganglion::{GanglionInprocess, GanglionInternal};
use plexor_core::neuron::{Neuron, NeuronImpl};
use plexor_core::payload::Payload;
use plexor_core::reactant::Reactant;
use plexor_core::test_utils::{DebugCodec, DebugStruct, test_namespace};
use plexor_core::thalamus::Thalamus;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{Sender, channel};
use uuid::Uuid;
#[derive(Clone)]
struct DistributionReactant {
id: usize,
tx: Sender<usize>,
}
impl Reactant<DebugStruct, DebugCodec> for DistributionReactant {
fn react(
&self,
_payload: Arc<Payload<DebugStruct, DebugCodec>>,
) -> Pin<
Box<dyn Future<Output = Result<(), plexor_core::reactant::ReactantError>> + Send + 'static>,
> {
let tx = self.tx.clone();
let id = self.id;
Box::pin(async move {
let _ = tx.try_send(id);
Ok(())
})
}
fn erase(
self: Box<Self>,
) -> Arc<dyn plexor_core::erasure::reactant::ReactantErased + Send + Sync + 'static> {
erase_reactant(self)
}
}
#[tokio::test]
async fn test_thalamus_load_balancing_and_tracing() {
let ns = test_namespace();
let neuron_impl = NeuronImpl::<DebugStruct, DebugCodec>::new(ns);
let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
let neuron_name = neuron_arc.name();
let g1 = Arc::new(Mutex::new(GanglionInprocess::new()));
let g2 = Arc::new(Mutex::new(GanglionInprocess::new()));
let g3 = Arc::new(Mutex::new(GanglionInprocess::new()));
let ganglia = [g1.clone(), g2.clone(), g3.clone()];
plexor_core::ganglion::adapt_all(&ganglia, neuron_arc.clone())
.await
.expect("Adapt all failed");
let (tx, mut rx) = channel(30);
for (i, g) in ganglia.iter().enumerate() {
let mut guard = g.lock().await;
let reactant = DistributionReactant {
id: i,
tx: tx.clone(),
};
guard
.react(neuron_name.clone(), vec![reactant.new_erased()], vec![])
.await
.expect("React failed");
}
let mut thalamus = Thalamus::new(vec![g1, g2, g3]);
let _correlation_id = Uuid::now_v7();
for i in 0..9 {
let payload = Payload::with_correlation(
DebugStruct {
foo: i,
bar: "load balanced msg".to_string(),
},
neuron_arc.clone(),
Some(_correlation_id),
);
thalamus
.transmit(erase_payload(payload))
.await
.expect("Transmit failed");
}
let mut counts = [0, 0, 0];
for _ in 0..9 {
let id = rx.recv().await.expect("No message");
counts[id] += 1;
}
assert_eq!(counts[0], 3);
assert_eq!(counts[1], 3);
assert_eq!(counts[2], 3);
}