use plexor_core::codec::{Codec, CodecError, CodecName};
use plexor_core::erasure::neuron::erase_neuron;
use plexor_core::erasure::payload::erase_payload;
use plexor_core::erasure::reactant::erase_reactant;
use plexor_core::ganglion::{Ganglion, GanglionInternal};
use plexor_core::namespace::NamespaceImpl;
use plexor_core::neuron::{Neuron, NeuronImpl};
use plexor_core::payload::Payload;
use plexor_core::plexus::Plexus;
use plexor_core::reactant::Reactant;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc::{Sender, channel};
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
struct AlphaMsg {
value: i32,
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
struct BetaMsg {
text: String,
}
struct UniversalTestCodec;
impl CodecName for UniversalTestCodec {
fn name() -> &'static str {
"universal_test"
}
}
impl Codec<AlphaMsg> for UniversalTestCodec {
fn encode(_data: &AlphaMsg) -> Result<Vec<u8>, CodecError> {
Ok(vec![])
}
fn decode(_data: &[u8]) -> Result<AlphaMsg, CodecError> {
Ok(AlphaMsg { value: 123 })
}
}
impl Codec<BetaMsg> for UniversalTestCodec {
fn encode(_data: &BetaMsg) -> Result<Vec<u8>, CodecError> {
Ok(vec![])
}
fn decode(_data: &[u8]) -> Result<BetaMsg, CodecError> {
Ok(BetaMsg {
text: "Hello Beta".to_string(),
})
}
}
#[derive(Clone)]
struct AlphaReactant {
tx: Sender<i32>,
}
impl Reactant<AlphaMsg, UniversalTestCodec> for AlphaReactant {
fn react(
&self,
payload: Arc<Payload<AlphaMsg, UniversalTestCodec>>,
) -> Pin<
Box<dyn Future<Output = Result<(), plexor_core::reactant::ReactantError>> + Send + 'static>,
> {
let tx = self.tx.clone();
let val = payload.value.value;
Box::pin(async move {
let _ = tx.try_send(val);
Ok(())
})
}
fn erase(
self: Box<Self>,
) -> Arc<dyn plexor_core::erasure::reactant::ReactantErased + Send + Sync + 'static> {
erase_reactant(self)
}
}
#[derive(Clone)]
struct BetaReactant {
tx: Sender<String>,
}
impl Reactant<BetaMsg, UniversalTestCodec> for BetaReactant {
fn react(
&self,
payload: Arc<Payload<BetaMsg, UniversalTestCodec>>,
) -> Pin<
Box<dyn Future<Output = Result<(), plexor_core::reactant::ReactantError>> + Send + 'static>,
> {
let tx = self.tx.clone();
let val = payload.value.text.clone();
Box::pin(async move {
let _ = tx.try_send(val);
Ok(())
})
}
fn erase(
self: Box<Self>,
) -> Arc<dyn plexor_core::erasure::reactant::ReactantErased + Send + Sync + 'static> {
erase_reactant(self)
}
}
#[tokio::test]
async fn test_universal_plexus_routing_multiple_types() {
let ns = Arc::new(NamespaceImpl {
delimiter: ".",
parts: vec!["test"],
});
let neuron_alpha = Arc::new(NeuronImpl::<AlphaMsg, UniversalTestCodec>::new(ns.clone()));
let neuron_beta = Arc::new(NeuronImpl::<BetaMsg, UniversalTestCodec>::new(ns.clone()));
let mut plexus = Plexus::new(
vec![
erase_neuron(neuron_alpha.clone()),
erase_neuron(neuron_beta.clone()),
],
vec![],
)
.await;
plexus
.adapt(neuron_alpha.clone())
.await
.expect("Adapt Alpha failed");
plexus
.adapt(neuron_beta.clone())
.await
.expect("Adapt Beta failed");
let (tx_a, mut rx_alpha) = channel(10);
let (tx_b, mut rx_beta) = channel(10);
plexus
.react(
neuron_alpha.name(),
vec![Box::new(AlphaReactant { tx: tx_a }).erase()],
vec![],
)
.await
.expect("React Alpha failed");
plexus
.react(
neuron_beta.name(),
vec![Box::new(BetaReactant { tx: tx_b }).erase()],
vec![],
)
.await
.expect("React Beta failed");
let payload_alpha = Payload::new(AlphaMsg { value: 123 }, neuron_alpha.clone());
plexus
.transmit(erase_payload(payload_alpha))
.await
.expect("Transmit Alpha failed");
let payload_beta = Payload::new(
BetaMsg {
text: "Hello Beta".to_string(),
},
neuron_beta.clone(),
);
plexus
.transmit(erase_payload(payload_beta))
.await
.expect("Transmit Beta failed");
let val_a = rx_alpha.recv().await.expect("No Alpha message");
assert_eq!(val_a, 123);
let val_b = rx_beta.recv().await.expect("No Beta message");
assert_eq!(val_b, "Hello Beta");
assert!(rx_alpha.try_recv().is_err());
assert!(rx_beta.try_recv().is_err());
}