use plexor_core::erasure::neuron::erase_neuron;
use plexor_core::erasure::payload::erase_payload;
use plexor_core::erasure::reactant::erase_reactant;
use plexor_core::ganglion::{Ganglion, GanglionInternal};
use plexor_core::neuron::{Neuron, NeuronImpl};
use plexor_core::payload::Payload;
use plexor_core::plexus::Plexus;
use plexor_core::reactant::Reactant;
use plexor_core::test_utils::{DebugCodec, DebugStruct, test_namespace};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{Sender, channel};
use uuid::Uuid;
#[derive(Clone)]
struct TestReactant {
tx: Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
}
impl Reactant<DebugStruct, DebugCodec> for TestReactant {
fn react(
&self,
payload: Arc<Payload<DebugStruct, DebugCodec>>,
) -> Pin<
Box<dyn Future<Output = Result<(), plexor_core::reactant::ReactantError>> + Send + 'static>,
> {
let tx = self.tx.clone();
Box::pin(async move {
let _ = tx.try_send(payload);
Ok(())
})
}
fn erase(
self: Box<Self>,
) -> Arc<dyn plexor_core::erasure::reactant::ReactantErased + Send + Sync + 'static> {
erase_reactant(self)
}
}
#[tokio::test]
async fn test_plexus_full_integration_and_tracing() {
let ns = test_namespace();
let neuron_impl = NeuronImpl::<DebugStruct, DebugCodec>::new(ns);
let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
let neuron_name = neuron_arc.name();
let mut plexus = Plexus::new(vec![erase_neuron(neuron_arc.clone())], vec![]).await;
let (tx, mut rx) = channel(10);
let reactant = TestReactant { tx };
plexus
.adapt(neuron_arc.clone())
.await
.expect("Adapt failed");
plexus
.react(neuron_name.clone(), vec![reactant.new_erased()], vec![])
.await
.expect("React failed");
let correlation_id = Uuid::now_v7();
let span_id = Uuid::now_v7().as_u128() as u64;
let payload = Payload::builder()
.value(DebugStruct {
foo: 1,
bar: "test message".to_string(),
})
.neuron(neuron_arc.clone())
.correlation_id(correlation_id)
.span_id(span_id)
.build()
.expect("Failed to build payload");
plexus
.transmit(erase_payload(payload))
.await
.expect("Transmit failed");
let received = tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.expect("Timeout")
.expect("No message");
assert_eq!(received.value.foo, 1);
assert_eq!(received.correlation_id(), correlation_id);
assert_eq!(received.span_id(), span_id);
assert!(received.parent_id().is_none());
}