use plexor_core::axon::{Axon, AxonImpl};
use plexor_core::erasure::reactant::{
ErrorReactantErased, ReactantErased, erase_error_reactant, erase_reactant,
};
use plexor_core::ganglion::{Ganglion, GanglionInternal};
use plexor_core::namespace::NamespaceImpl;
use plexor_core::neuron::{Neuron, NeuronImpl};
use plexor_core::payload::Payload;
use plexor_core::plexus::Plexus;
use plexor_core::reactant::{ErrorReactant, Reactant, ReactantError};
use plexor_core::test_utils::{DebugCodec, DebugStruct};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
#[derive(Clone)]
struct FailingReactant;
impl Reactant<DebugStruct, DebugCodec> for FailingReactant {
fn react(
&self,
_payload: Arc<Payload<DebugStruct, DebugCodec>>,
) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
Box::pin(async move { Err(ReactantError::Execution("Intentional Failure".to_string())) })
}
fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
erase_reactant::<DebugStruct, DebugCodec, Self>(self)
}
}
#[derive(Clone)]
struct PayloadCapturingErrorReactant {
sender: tokio::sync::mpsc::Sender<(String, i32)>,
}
impl ErrorReactant<DebugStruct, DebugCodec> for PayloadCapturingErrorReactant {
fn react_error(
&self,
error: Arc<ReactantError>,
payload: Arc<Payload<DebugStruct, DebugCodec>>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
let sender = self.sender.clone();
Box::pin(async move {
let _ = sender.send((error.to_string(), payload.value.foo)).await;
})
}
fn erase_error(self: Box<Self>) -> Arc<dyn ErrorReactantErased + Send + Sync + 'static> {
erase_error_reactant::<DebugStruct, DebugCodec, Self>(self)
}
}
#[tokio::test]
async fn test_error_reactant_receives_payload_context() {
let ns = Arc::new(NamespaceImpl {
delimiter: ".",
parts: vec!["test", "error"],
});
let neuron = NeuronImpl::<DebugStruct, DebugCodec>::new_arc(ns);
let plexus_arc = Plexus::new_shared(vec![], vec![]).await;
{
let mut p = plexus_arc.lock().await;
p.adapt(neuron.clone()).await.expect("adapt failed");
}
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let failing = FailingReactant;
let error_handler = PayloadCapturingErrorReactant { sender: tx };
{
let mut p = plexus_arc.lock().await;
p.react(
neuron.name(),
vec![failing.new_erased()],
vec![error_handler.new_erased_error()],
)
.await
.expect("react failed");
}
let mut axon = AxonImpl::new(neuron.clone(), plexus_arc.clone());
let msg = DebugStruct {
foo: 42,
bar: "test".to_string(),
};
axon.transmit(msg).await.expect("transmit failed");
let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()).await;
match result {
Ok(Some((err_msg, val))) => {
assert!(err_msg.contains("Intentional Failure"));
assert_eq!(val, 42);
}
Ok(None) => panic!("Channel closed unexpectedly"),
Err(_) => panic!("Timeout waiting for error handler"),
}
}