use crate::codec::{Codec, CodecName};
use crate::erasure::payload::erase_payload;
use crate::erasure::reactant::ReactantErased;
use crate::ganglion::{GanglionError, GanglionInternal};
use crate::logging::TraceContext;
use crate::neuron::Neuron;
use crate::payload::Payload;
use crate::reactant::Reactant;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::Mutex;
use tokio::time::timeout;
use tracing::Instrument;
use uuid::Uuid;
#[derive(Error, Debug)]
pub enum AxonError {
#[error(
"Neuron needs to be adapted to ganglion {ganglion_name} (id: {ganglion_id}): {neuron_name}"
)]
NeuronNotAdapted {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
},
#[error(
"Failed to acquire lock on synapse: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
)]
SynapseLock {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
},
#[error(
"Transmission error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id}): {message}"
)]
Transmit {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
message: String,
},
#[error("Encode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
Encode {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
},
#[error("Decode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
Decode {
neuron_name: String,
ganglion_name: String,
ganglion_id: Uuid,
},
#[error("Transmission timeout")]
TransmissionTimeout,
#[error("Ganglion error: {0}")]
GanglionError(#[from] GanglionError),
}
impl AxonError {
pub fn from_ganglion_error(error: GanglionError) -> Self {
match error {
GanglionError::SynapseNotFound {
neuron_name,
ganglion_name,
ganglion_id,
} => AxonError::NeuronNotAdapted {
neuron_name,
ganglion_name,
ganglion_id,
},
GanglionError::SynapseLock {
neuron_name,
ganglion_name,
ganglion_id,
} => AxonError::SynapseLock {
neuron_name,
ganglion_name,
ganglion_id,
},
GanglionError::Transmit {
neuron_name,
ganglion_name,
ganglion_id,
message,
} => AxonError::Transmit {
neuron_name,
ganglion_name,
ganglion_id,
message,
},
GanglionError::Encode {
neuron_name,
ganglion_name,
ganglion_id,
} => AxonError::Encode {
neuron_name,
ganglion_name,
ganglion_id,
},
GanglionError::Decode {
neuron_name,
ganglion_name,
ganglion_id,
} => AxonError::Decode {
neuron_name,
ganglion_name,
ganglion_id,
},
GanglionError::Adapt { .. } | GanglionError::QueueFull { .. } => {
AxonError::GanglionError(error)
}
}
}
}
pub trait Axon<T, C>: Send + Sync
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
{
fn react(
&mut self,
reactants: Vec<Box<dyn Reactant<T, C> + Send + Sync + 'static>>,
) -> Pin<Box<dyn Future<Output = Result<(), AxonError>> + Send + '_>>;
fn transmit(
&mut self,
data: T,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
fn transmit_with_trace(
&mut self,
data: T,
trace_context: TraceContext,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
fn transmit_erased(
&mut self,
data: T,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
fn transmit_erased_with_trace(
&mut self,
data: T,
trace_context: TraceContext,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
fn neuron_name(&self) -> String;
fn transmit_with_timeout(
&mut self,
data: T,
timeout_duration: Duration,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
Box::pin(async move {
match timeout(timeout_duration, self.transmit(data)).await {
Ok(result) => result,
Err(_) => Err(AxonError::TransmissionTimeout),
}
})
}
fn transmit_with_trace_with_timeout(
&mut self,
data: T,
trace_context: TraceContext,
timeout_duration: Duration,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
Box::pin(async move {
match timeout(timeout_duration, self.transmit_with_trace(data, trace_context)).await {
Ok(result) => result,
Err(_) => Err(AxonError::TransmissionTimeout),
}
})
}
#[allow(clippy::type_complexity)]
fn transmit_batch(
&mut self,
data_items: Vec<T>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<()>>, AxonError>> + Send + '_>> {
Box::pin(async move {
let mut results = Vec::new();
for item in data_items {
match self.transmit(item).await {
Ok(result) => results.push(result),
Err(e) => return Err(e),
}
}
Ok(results)
})
}
#[allow(clippy::type_complexity)]
fn transmit_batch_with_trace(
&mut self,
data_items: Vec<T>,
trace_context: TraceContext,
) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<()>>, AxonError>> + Send + '_>> {
Box::pin(async move {
let mut results = Vec::new();
for item in data_items {
match self.transmit_with_trace(item, trace_context).await {
Ok(result) => results.push(result),
Err(e) => return Err(e),
}
}
Ok(results)
})
}
fn is_ready(&self) -> bool {
!self.neuron_name().is_empty()
}
fn status(&self) -> String {
format!("Axon[neuron: {}]", self.neuron_name())
}
}
pub struct AxonImpl<T, C>
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
{
neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>,
ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
}
impl<T, C> Clone for AxonImpl<T, C>
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
neuron: self.neuron.clone(),
ganglion: self.ganglion.clone(),
}
}
}
impl<T, C> AxonImpl<T, C>
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
{
pub fn new(
neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>,
ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
) -> Self {
Self { neuron, ganglion }
}
pub fn builder() -> AxonBuilder<T, C> {
AxonBuilder::new()
}
pub fn neuron(&self) -> &Arc<dyn Neuron<T, C> + Send + Sync + 'static> {
&self.neuron
}
pub fn ganglion(&self) -> &Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>> {
&self.ganglion
}
pub fn clone_neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync + 'static> {
self.neuron.clone()
}
pub fn clone_ganglion(&self) -> Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>> {
self.ganglion.clone()
}
pub async fn ganglion_id(&self) -> Uuid {
self.ganglion.lock().await.unique_id()
}
pub fn validate(&self) -> Result<(), String> {
if self.neuron.name().is_empty() {
return Err("Neuron name is empty".to_string());
}
Ok(())
}
}
pub struct AxonBuilder<T, C>
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
{
neuron: Option<Arc<dyn Neuron<T, C> + Send + Sync + 'static>>,
ganglion: Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>,
}
impl<T, C> AxonBuilder<T, C>
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
{
pub fn new() -> Self {
Self {
neuron: None,
ganglion: None,
}
}
pub fn with_neuron(mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>) -> Self {
self.neuron = Some(neuron);
self
}
pub fn with_ganglion(
mut self,
ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
) -> Self {
self.ganglion = Some(ganglion);
self
}
pub fn build(self) -> Result<AxonImpl<T, C>, String> {
let neuron = self.neuron.ok_or("Neuron is required")?;
let ganglion = self.ganglion.ok_or("Ganglion is required")?;
let axon = AxonImpl::new(neuron, ganglion);
axon.validate()?;
Ok(axon)
}
pub fn build_unchecked(self) -> Result<AxonImpl<T, C>, String> {
let neuron = self.neuron.ok_or("Neuron is required")?;
let ganglion = self.ganglion.ok_or("Ganglion is required")?;
Ok(AxonImpl::new(neuron, ganglion))
}
}
impl<T, C> Default for AxonBuilder<T, C>
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<T, C> Axon<T, C> for AxonImpl<T, C>
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
{
fn react(
&mut self,
reactants: Vec<Box<dyn Reactant<T, C> + Send + Sync + 'static>>,
) -> Pin<Box<dyn Future<Output = Result<(), AxonError>> + Send + '_>> {
let neuron_name = self.neuron.name();
let ganglion = self.ganglion.clone();
Box::pin(async move {
let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = reactants
.into_iter()
.map(|reactant| reactant.erase())
.collect();
let future = {
let mut ganglion_guard = ganglion.lock().await;
ganglion_guard.react(neuron_name, erased_reactants, vec![])
};
match future.await {
Ok(result) => Ok(result),
Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
}
})
}
fn transmit(
&mut self,
data: T,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
let neuron = self.neuron.clone();
let ganglion = self.ganglion.clone();
Box::pin(async move {
let payload = Payload::new(data, neuron.clone());
let future = {
let mut ganglion_guard = ganglion.lock().await;
ganglion_guard.transmit(erase_payload(payload))
};
match future.await {
Ok(result) => Ok(result),
Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
}
})
}
fn transmit_with_trace(
&mut self,
data: T,
trace_context: TraceContext,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
let neuron = self.neuron.clone();
let ganglion = self.ganglion.clone();
Box::pin(async move {
let payload = Arc::new(Payload::from_parts(
Arc::new(data),
neuron.clone(),
trace_context.new_child(),
));
let future = {
let mut ganglion_guard = ganglion.lock().await;
ganglion_guard.transmit(erase_payload(payload))
};
match future.await {
Ok(result) => Ok(result),
Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
}
})
}
fn transmit_erased(
&mut self,
data: T,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
let neuron = self.neuron.clone();
let ganglion = self.ganglion.clone();
Box::pin(
async move {
let payload = Payload::new(data, neuron.clone());
let erased_payload = erase_payload(payload);
let span = erased_payload.span_debug("AxonImpl::transmit_erased");
async move {
tracing::debug!("Starting erased transmission");
let future = {
let mut ganglion_guard = ganglion.lock().await;
ganglion_guard.transmit(erased_payload)
};
match future.await {
Ok(result) => Ok(result),
Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
}
}
.instrument(span)
.await
},
)
}
fn transmit_erased_with_trace(
&mut self,
data: T,
trace_context: TraceContext,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
let neuron = self.neuron.clone();
let ganglion = self.ganglion.clone();
Box::pin(async move {
let payload = Arc::new(Payload::from_parts(
Arc::new(data),
neuron.clone(),
trace_context.new_child(),
));
let erased_payload = erase_payload(payload);
let span = erased_payload.span_debug("AxonImpl::transmit_erased_with_trace");
async move {
tracing::debug!("Starting erased transmission with trace");
let future = {
let mut ganglion_guard = ganglion.lock().await;
ganglion_guard.transmit(erased_payload)
};
match future.await {
Ok(result) => Ok(result),
Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
}
}
.instrument(span)
.await
})
}
fn neuron_name(&self) -> String {
self.neuron.name()
}
}
impl<T, C> std::fmt::Debug for AxonImpl<T, C>
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AxonImpl")
.field("neuron", &self.neuron.name())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ganglion::{Ganglion, GanglionInprocess};
use crate::neuron::NeuronImpl;
use crate::payload::Payload;
use crate::plexus::Plexus;
use crate::test_utils::{DebugCodec, DebugStruct, TokioMpscReactant, test_namespace};
use tokio::sync::{Mutex, mpsc::channel};
use uuid::Uuid;
#[tokio::test]
async fn test_axon_creation() {
let ns = test_namespace();
let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
Arc::new(NeuronImpl::new(ns));
let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(GanglionInprocess::new()));
let axon = AxonImpl::new(neuron, ganglion);
assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
}
#[tokio::test]
async fn test_axon_transmit_with_plexus() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let mut plexus = Plexus::new(vec![], vec![]).await;
plexus
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(plexus));
let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> = vec![
Box::new(TokioMpscReactant::new(tx1)),
Box::new(TokioMpscReactant::new(tx2)),
];
axon.react(reactants).await.expect("Failed to react");
let test_data = DebugStruct {
foo: 42,
bar: "test transmission".to_string(),
};
let trace_context = TraceContext::new();
let correlation_id = trace_context.correlation_id;
axon.transmit_with_trace(test_data.clone(), trace_context)
.await
.expect("Failed to transmit");
let received1 = rx1.recv().await.expect("Failed to receive payload 1");
let received2 = rx2.recv().await.expect("Failed to receive payload 2");
assert_eq!(received1.value.foo, 42);
assert_eq!(received1.value.bar, "test transmission");
assert_eq!(received2.value.foo, 42);
assert_eq!(received2.value.bar, "test transmission");
assert_eq!(received1.correlation_id(), correlation_id);
assert_eq!(received2.correlation_id(), correlation_id);
}
#[tokio::test]
async fn test_axon_transmit_erased() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let mut plexus = Plexus::new(vec![], vec![]).await;
plexus
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(plexus));
let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
vec![Box::new(TokioMpscReactant::new(tx))];
axon.react(reactants).await.expect("Failed to react");
let test_data = DebugStruct {
foo: 99,
bar: "erased test".to_string(),
};
let trace_context = TraceContext::new();
let correlation_id = trace_context.correlation_id;
axon.transmit_erased_with_trace(test_data.clone(), trace_context)
.await
.expect("Failed to transmit erased");
let received = rx.recv().await.expect("Failed to receive payload");
assert_eq!(received.value.foo, 99);
assert_eq!(received.value.bar, "erased test");
assert_eq!(received.correlation_id(), correlation_id);
}
#[tokio::test]
async fn test_axon_transmit_without_correlation_id() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let mut plexus = Plexus::new(vec![], vec![]).await;
plexus
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(plexus));
let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
vec![Box::new(TokioMpscReactant::new(tx))];
axon.react(reactants).await.expect("Failed to react");
let test_data = DebugStruct {
foo: 123,
bar: "auto correlation".to_string(),
};
axon.transmit(test_data.clone())
.await
.expect("Failed to transmit");
let received = rx.recv().await.expect("Failed to receive payload");
assert_eq!(received.value.foo, 123);
assert_eq!(received.value.bar, "auto correlation");
assert_ne!(received.correlation_id(), Uuid::nil());
}
#[tokio::test]
async fn test_axon_multiple_transmissions() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let mut plexus = Plexus::new(vec![], vec![]).await;
plexus
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(plexus));
let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
vec![Box::new(TokioMpscReactant::new(tx))];
axon.react(reactants).await.expect("Failed to react");
for i in 0..5 {
let test_data = DebugStruct {
foo: i,
bar: format!("transmission {i}"),
};
axon.transmit(test_data)
.await
.expect("Failed to transmit");
}
for i in 0..5 {
let received = rx
.recv()
.await
.unwrap_or_else(|| panic!("Failed to receive payload {i}"));
assert_eq!(received.value.foo, i);
assert_eq!(received.value.bar, format!("transmission {i}"));
}
}
#[tokio::test]
async fn test_axon_transmit_with_plexus_external_ganglion() {
use crate::erasure::reactant::erase_reactant_raw;
use crate::ganglion::GanglionExternal;
use crate::neuron::NeuronImpl;
use crate::payload::PayloadRaw;
use crate::plexus::Plexus;
use crate::test_utils::{
DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactantRaw,
test_namespace,
};
use tokio::sync::mpsc::channel;
use tokio::time::{Duration, sleep};
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let mut plexus = Plexus::new(vec![], vec![]).await;
plexus
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
let mut external_ganglion = GanglionExternalInprocess::new();
external_ganglion
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron to external ganglion");
let raw_reactants = vec![erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(
TokioMpscReactantRaw::new(tx_raw),
))];
external_ganglion
.react(neuron_arc.name(), vec![], raw_reactants, vec![])
.await
.expect("Failed to react with external ganglion");
let external_ganglion_arc = Arc::new(Mutex::new(external_ganglion));
let _ = plexus
.infuse_external_ganglion(external_ganglion_arc.clone())
.await;
let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(plexus));
let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
vec![];
axon.react(reactants).await.expect("Failed to react");
let test_data = DebugStruct {
foo: 99,
bar: "external ganglion test".to_string(),
};
let trace_context = TraceContext::new();
let correlation_id = trace_context.correlation_id;
axon.transmit_with_trace(test_data.clone(), trace_context)
.await
.expect("Failed to transmit");
sleep(Duration::from_millis(100)).await;
assert_eq!(
rx_raw.len(),
1,
"External ganglion should have received exactly one payload"
);
let received_raw = rx_raw
.recv()
.await
.expect("Failed to receive raw payload from external ganglion");
let decoded_data = DebugCodec::decode(&received_raw.value).expect("Failed to decode");
assert_eq!(decoded_data.foo, 99);
assert_eq!(decoded_data.bar, "external ganglion test");
assert_eq!(received_raw.correlation_id(), correlation_id);
}
#[tokio::test]
async fn test_axon_transmit_simple() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let mut plexus = Plexus::new(vec![], vec![]).await;
plexus
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(plexus));
let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
vec![Box::new(TokioMpscReactant::new(tx))];
axon.react(reactants).await.expect("Failed to react");
let test_data = DebugStruct {
foo: 555,
bar: "simple transmission".to_string(),
};
axon.transmit(test_data.clone())
.await
.expect("Failed to transmit simple");
let received = rx.recv().await.expect("Failed to receive payload");
assert_eq!(received.value.foo, 555);
assert_eq!(received.value.bar, "simple transmission");
assert_ne!(received.correlation_id(), Uuid::nil());
}
#[tokio::test]
async fn test_axon_transmit_with_timeout() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let mut plexus = Plexus::new(vec![], vec![]).await;
plexus
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(plexus));
let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
vec![Box::new(TokioMpscReactant::new(tx))];
axon.react(reactants).await.expect("Failed to react");
let test_data = DebugStruct {
foo: 777,
bar: "timeout test".to_string(),
};
let timeout_duration = Duration::from_secs(1);
axon.transmit_with_timeout(test_data.clone(), timeout_duration)
.await
.expect("Failed to transmit with timeout");
let received = rx.recv().await.expect("Failed to receive payload");
assert_eq!(received.value.foo, 777);
assert_eq!(received.value.bar, "timeout test");
}
#[tokio::test]
async fn test_axon_transmit_batch() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let mut plexus = Plexus::new(vec![], vec![]).await;
plexus
.adapt(neuron_arc.clone())
.await
.expect("Failed to adapt neuron");
let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(plexus));
let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
vec![Box::new(TokioMpscReactant::new(tx))];
axon.react(reactants).await.expect("Failed to react");
let batch_data = vec![
DebugStruct {
foo: 1,
bar: "batch item 1".to_string(),
},
DebugStruct {
foo: 2,
bar: "batch item 2".to_string(),
},
DebugStruct {
foo: 3,
bar: "batch item 3".to_string(),
},
];
let results = axon
.transmit_batch(batch_data.clone())
.await
.expect("Failed to transmit batch");
assert_eq!(results.len(), 3);
for i in 0..3 {
let received = rx
.recv()
.await
.unwrap_or_else(|| panic!("Failed to receive batch item {i}"));
assert_eq!(received.value.foo, (i + 1));
assert_eq!(received.value.bar, format!("batch item {}", i + 1));
assert_ne!(received.correlation_id(), Uuid::nil());
}
}
#[tokio::test]
async fn test_axon_status_and_validation() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(GanglionInprocess::new()));
let axon = AxonImpl::new(neuron_arc, ganglion);
assert!(axon.is_ready());
let status = axon.status();
assert!(status.contains("dev.plexo.DebugStruct.debug"));
assert!(axon.validate().is_ok());
assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
}
#[tokio::test]
async fn test_axon_builder() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(GanglionInprocess::new()));
let axon = AxonImpl::builder()
.with_neuron(neuron_arc.clone())
.with_ganglion(ganglion.clone())
.build()
.expect("Failed to build axon");
assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
assert!(axon.is_ready());
let result = AxonImpl::<DebugStruct, DebugCodec>::builder()
.with_neuron(neuron_arc.clone())
.build();
assert!(result.is_err());
assert!(result.unwrap_err().contains("Ganglion is required"));
let axon_unchecked = AxonImpl::builder()
.with_neuron(neuron_arc.clone())
.with_ganglion(ganglion.clone())
.build_unchecked()
.expect("Failed to build unchecked axon");
assert_eq!(axon_unchecked.neuron_name(), "dev.plexo.DebugStruct.debug");
}
#[tokio::test]
async fn test_axon_additional_methods() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
let neuron_arc = neuron_impl.clone_to_arc();
let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
Arc::new(Mutex::new(GanglionInprocess::new()));
let axon = AxonImpl::new(neuron_arc.clone(), ganglion.clone());
let cloned_neuron = axon.clone_neuron();
assert_eq!(cloned_neuron.name(), neuron_arc.name());
let ganglion_id = axon.ganglion_id().await;
assert_ne!(ganglion_id, Uuid::nil());
assert_eq!(axon.neuron().name(), "dev.plexo.DebugStruct.debug");
}
#[test]
fn test_axon_error_variants_and_conversion() {
let ganglion_id = Uuid::now_v7();
let ganglion_name = "TestGanglion".to_string();
let neuron_name = "test_neuron".to_string();
let ganglion_synapse_not_found = GanglionError::SynapseNotFound {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
};
let axon_neuron_not_adapted = AxonError::from_ganglion_error(ganglion_synapse_not_found);
match axon_neuron_not_adapted {
AxonError::NeuronNotAdapted {
neuron_name: n,
ganglion_name: g,
ganglion_id: id,
} => {
assert_eq!(n, neuron_name);
assert_eq!(g, ganglion_name);
assert_eq!(id, ganglion_id);
}
_ => panic!("Expected NeuronNotAdapted variant"),
}
let ganglion_synapse_lock = GanglionError::SynapseLock {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
};
let axon_synapse_lock = AxonError::from_ganglion_error(ganglion_synapse_lock);
match axon_synapse_lock {
AxonError::SynapseLock {
neuron_name: n,
ganglion_name: g,
ganglion_id: id,
} => {
assert_eq!(n, neuron_name);
assert_eq!(g, ganglion_name);
assert_eq!(id, ganglion_id);
}
_ => panic!("Expected SynapseLockError variant"),
}
let ganglion_transmit = GanglionError::Transmit {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
message: "test transmit error".to_string(),
};
let axon_transmit = AxonError::from_ganglion_error(ganglion_transmit);
match axon_transmit {
AxonError::Transmit {
neuron_name: n,
ganglion_name: g,
ganglion_id: id,
message: m,
} => {
assert_eq!(n, neuron_name);
assert_eq!(g, ganglion_name);
assert_eq!(id, ganglion_id);
assert_eq!(m, "test transmit error");
}
_ => panic!("Expected Transmit variant"),
}
let direct_neuron_not_adapted = AxonError::NeuronNotAdapted {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
};
assert!(direct_neuron_not_adapted.to_string().contains(&neuron_name));
assert!(
direct_neuron_not_adapted
.to_string()
.contains(&ganglion_name)
);
let direct_transmit = AxonError::Transmit {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
message: "direct message".to_string(),
};
assert!(direct_transmit.to_string().contains(&neuron_name));
assert!(direct_transmit.to_string().contains(&ganglion_name));
assert!(direct_transmit.to_string().contains("direct message"));
}
}