use crate::backpressure::{BackpressureConfig, BackpressureQueue};
use crate::codec::{Codec, CodecName};
use crate::dendrite::{Dendrite, DendriteError};
use crate::erasure::neuron::{NeuronErased, NeuronErasedWrapper};
use crate::erasure::reactant::ReactantErased;
use crate::logging::LogTrace;
use crate::neuron::Neuron;
use crate::payload::{Payload, PayloadRaw};
use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use parking_lot::RwLock;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SynapseError {
#[error("Queue for neuron '{neuron_name}' is full")]
QueueFull { neuron_name: String },
#[error(transparent)]
Dendrite(#[from] DendriteError),
#[error("Type conversion failed for neuron '{neuron_name}'")]
NeuronTypeConversion { neuron_name: String },
#[error("Type conversion failed for reactant in neuron '{neuron_name}'")]
ReactantTypeConversion { neuron_name: String },
#[error("No dendrite available for processing in neuron '{neuron_name}'")]
NoDendrite { neuron_name: String },
}
use tracing::Instrument;
pub trait SynapseInternal<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync>;
fn transduce(
&self,
payload: Arc<Payload<T, C>>,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
fn transmit(
&self,
payload: Arc<Payload<T, C>>,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
fn react(
&mut self,
reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
) -> Result<(), SynapseError>;
}
pub trait SynapseExternal<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync>;
fn transduce(
&self,
payload: Arc<PayloadRaw<T, C>>,
) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static;
fn transmit(
&self,
payload: Arc<PayloadRaw<T, C>>,
) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static;
fn react(
&mut self,
reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
) -> Result<(), SynapseError>;
}
pub trait RawSender: Send + Sync {
fn send(
&self,
topic: &str,
data: Vec<u8>,
) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
}
pub struct BackpressureSender<S: RawSender> {
queue: Arc<BackpressureQueue<(String, Vec<u8>)>>,
_marker: PhantomData<S>,
}
impl<S: RawSender + 'static> BackpressureSender<S> {
pub fn new(inner: S, config: BackpressureConfig, neuron_name: String) -> Self {
let inner_arc = Arc::new(inner);
let inner_clone = inner_arc.clone();
let queue = BackpressureQueue::<(String, Vec<u8>)>::new(
neuron_name,
config,
move |(topic, data)| {
let s = inner_clone.clone();
async move {
if let Err(e) = s.send(&topic, data).await {
eprintln!("BackpressureSender failed to send: {}", e);
}
}
},
);
Self {
queue: Arc::new(queue),
_marker: PhantomData,
}
}
}
impl<S: RawSender + 'static> RawSender for BackpressureSender<S> {
fn send(
&self,
topic: &str,
data: Vec<u8>,
) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> {
let q = self.queue.clone();
let topic = topic.to_string();
Box::pin(async move {
q.push((topic, data)).await.map_err(|e| e.to_string())
})
}
}
pub struct BackpressureExternalSynapse<T, C, S>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
S: SynapseExternal<T, C> + Send + Sync + 'static,
{
inner: Arc<RwLock<S>>,
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
queue: Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>>,
_phantom: PhantomData<(T, C)>,
}
impl<T, C, S> BackpressureExternalSynapse<T, C, S>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
S: SynapseExternal<T, C> + Send + Sync + 'static,
{
pub fn new(synapse: S, config: BackpressureConfig) -> Self {
let neuron = synapse.neuron();
let neuron_name = neuron.name();
let synapse_arc = Arc::new(RwLock::new(synapse));
let inner_clone = synapse_arc.clone();
let queue = BackpressureQueue::new(
neuron_name,
config,
move |payload: Arc<PayloadRaw<T, C>>| {
let s = inner_clone.clone();
async move {
let future = {
let guard = s.read();
guard.transmit(payload)
};
let _ = future.await;
}
},
);
Self {
inner: synapse_arc,
neuron,
queue: Arc::new(queue),
_phantom: PhantomData,
}
}
}
impl<T, C, S> SynapseExternal<T, C> for BackpressureExternalSynapse<T, C, S>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
S: SynapseExternal<T, C> + Send + Sync + 'static,
{
fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
self.neuron.clone()
}
fn transduce(
&self,
payload: Arc<PayloadRaw<T, C>>,
) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
self.transmit(payload)
}
fn transmit(
&self,
payload: Arc<PayloadRaw<T, C>>,
) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
let q = self.queue.clone();
Box::pin(async move {
q.push(payload).await?;
Ok((vec![], vec![]))
})
}
fn react(
&mut self,
reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
) -> Result<(), SynapseError> {
let mut guard = self.inner.write();
guard.react(reactants, raw_reactants, error_reactants)
}
}
pub struct SynapseInprocess<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Sync + Send + 'static,
{
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
dendrite: Option<Dendrite<T, C>>,
_codec_marker: PhantomData<fn() -> &'static ()>,
}
impl<T, C> SynapseInprocess<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Sync + Send + 'static,
{
pub fn new(
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
) -> Self {
let dendrite = if !reactants.is_empty() || !error_reactants.is_empty() {
Some(Dendrite::new(neuron.clone(), reactants, error_reactants))
} else {
None
};
Self {
neuron,
dendrite,
_codec_marker: PhantomData,
}
}
pub fn from_erased(
neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
) -> Option<Self>
where
T: 'static,
C: 'static,
{
use std::any::TypeId;
let neuron_type_id = neuron.payload_type_id();
let codec_type_id = neuron.codec_type_id();
if neuron_type_id != TypeId::of::<T>() || codec_type_id != TypeId::of::<C>() {
return None;
}
let typed_neuron = match neuron.as_any().downcast_ref::<NeuronErasedWrapper<T, C>>() {
Some(wrapper) => wrapper.get_typed_neuron(),
None => return None,
};
let typed_reactants: Vec<_> = reactants
.into_iter()
.filter_map(|erased_reactant| {
if erased_reactant.payload_type_id() != TypeId::of::<T>()
|| erased_reactant.codec_type_id() != TypeId::of::<C>()
{
return None;
}
let any_arc = erased_reactant.clone_to_any();
any_arc
.downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
.ok()
.map(|boxed_arc| (*boxed_arc).clone())
})
.collect();
Some(Self::new(typed_neuron.clone(), typed_reactants, vec![]))
}
}
impl<T, C> SynapseInternal<T, C> for SynapseInprocess<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Sync + Send + 'static,
{
fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
self.neuron.clone()
}
fn transduce(
&self,
payload: Arc<Payload<T, C>>,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
let span = payload.span_debug("SynapseInprocess::transduce");
match &self.dendrite {
Some(dendrite) => {
let future = dendrite.transduce(payload);
Box::pin(
async move {
tracing::debug!("SynapseInprocess::transduce calling dendrite.transduce");
future.await.map_err(SynapseError::from)
}
.instrument(span),
)
}
None => Box::pin(
async move {
tracing::debug!("SynapseInprocess::transduce no dendrite, returning empty vec");
Ok(vec![])
}
.instrument(span),
),
}
}
fn transmit(
&self,
payload: Arc<Payload<T, C>>,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
let span = payload.span_debug("SynapseInprocess::transmit");
let future = self.transduce(payload);
Box::pin(
async move {
tracing::debug!("SynapseInprocess::transmit called");
future.await
}
.instrument(span),
)
}
fn react(
&mut self,
reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
) -> Result<(), SynapseError> {
if reactants.is_empty() && error_reactants.is_empty() {
return Ok(());
}
match &self.dendrite {
Some(dendrite) => {
dendrite.add_reactants(reactants)?;
dendrite.add_error_reactants(error_reactants)?;
}
None => {
self.dendrite = Some(Dendrite::new(
self.neuron.clone(),
reactants,
error_reactants,
));
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::logging::TraceContext;
use crate::neuron::NeuronImpl;
use crate::payload::PayloadRaw;
use crate::test_utils::{
DebugCodec, DebugStruct, SynapseExternalInprocess, TokioMpscReactant, TokioMpscReactantRaw,
test_namespace,
};
use tokio::sync::mpsc::channel;
use uuid::Uuid;
#[tokio::test]
async fn test_synapse_inprocess_transmit() {
let ns = test_namespace();
let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> = vec![
Arc::new(TokioMpscReactant { sender: tx.clone() }),
Arc::new(TokioMpscReactant { sender: tx.clone() }),
];
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
let synapse = SynapseInprocess::new(neuron.clone(), reactants, vec![]);
let debug_struct = Arc::new(DebugStruct {
foo: 42,
bar: "test_value".to_owned(),
});
let correlation_id = Uuid::now_v7();
let span_id = Uuid::now_v7().as_u128() as u64;
let _ = synapse
.transmit(
Payload::builder()
.value((*debug_struct).clone())
.correlation_id(correlation_id)
.neuron(neuron.clone())
.span_id(span_id)
.build()
.unwrap(),
)
.await;
assert_eq!(rx.len(), 2);
let p = rx.recv().await.unwrap();
assert_eq!(p.value, debug_struct);
assert_eq!(p.correlation_id(), correlation_id);
assert_eq!(p.span_id(), span_id);
assert_eq!(rx.len(), 1);
let p2 = rx.recv().await.unwrap();
assert_eq!(p2.value, debug_struct);
assert_eq!(p2.correlation_id(), correlation_id);
assert_eq!(p2.span_id(), span_id);
assert_eq!(rx.len(), 0);
let debug_struct_2 = Arc::new(DebugStruct {
foo: 49,
bar: "foo_bar".to_owned(),
});
let correlation_id_2 = Uuid::now_v7();
let span_id_2 = Uuid::now_v7().as_u128() as u64;
let _ = synapse
.transmit(
Payload::builder()
.value((*debug_struct_2).clone())
.correlation_id(correlation_id_2)
.neuron(neuron.clone())
.span_id(span_id_2)
.build()
.unwrap(),
)
.await;
let p3 = rx.recv().await.unwrap();
assert_eq!(p3.value, debug_struct_2);
assert_eq!(p3.correlation_id(), correlation_id_2);
assert_eq!(p3.span_id(), span_id_2);
assert_eq!(rx.len(), 1);
let p4 = rx.recv().await.unwrap();
assert_eq!(p4.value, debug_struct_2);
assert_eq!(p4.correlation_id(), correlation_id_2);
assert_eq!(p4.span_id(), span_id_2);
assert_eq!(rx.len(), 0);
}
#[tokio::test]
async fn test_synapse_inprocess_with_none_reactants() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
let synapse = SynapseInprocess::new(neuron.clone(), vec![], vec![]);
let debug_struct = Arc::new(DebugStruct {
foo: 42,
bar: "test_value".to_owned(),
});
let correlation_id = Uuid::now_v7();
let span_id = Uuid::now_v7().as_u128() as u64;
let result = synapse
.transmit(
Payload::builder()
.value((*debug_struct).clone())
.correlation_id(correlation_id)
.neuron(neuron.clone())
.span_id(span_id)
.build()
.unwrap(),
)
.await;
assert_eq!(
result.expect("Should succeed").len(),
0,
"Should return empty vector when dendrite is None"
);
}
#[tokio::test]
async fn test_synapse_external_with_none_reactants() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
Arc::new(neuron_impl.clone());
let synapse = SynapseExternalInprocess::new(neuron.clone(), vec![], vec![], vec![]);
let debug_struct_value = DebugStruct {
foo: 42,
bar: "test_value".to_owned(),
};
let debug_struct_arc = Arc::new(debug_struct_value);
let correlation_id = Uuid::now_v7();
let direct_neuron_encoder: NeuronImpl<DebugStruct, DebugCodec> =
NeuronImpl::new(ns.clone());
let encoded = direct_neuron_encoder
.encode(debug_struct_arc.as_ref())
.expect("Failed to encode");
let span_id = Uuid::now_v7().as_u128() as u64;
let result = synapse
.transmit(Arc::new(PayloadRaw {
value: Arc::new(encoded.clone()),
neuron: neuron.clone(),
trace: TraceContext::from_parts(correlation_id, span_id, None),
}))
.await
.expect("Should succeed");
assert_eq!(
result.0.len(),
0,
"Should return empty vector for reactants when dendrite_decoder is None"
);
assert_eq!(
result.1.len(),
0,
"Should return empty vector for raw_reactants when dendrite_decoder is None"
);
}
#[tokio::test]
async fn test_synapse_external_inprocess_transmit() {
let ns = test_namespace();
let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
vec![Arc::new(TokioMpscReactant { sender: tx.clone() })];
let raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
vec![Arc::new(TokioMpscReactantRaw {
sender: tx_raw.clone(),
})];
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
Arc::new(neuron_impl.clone());
let synapse =
SynapseExternalInprocess::new(neuron.clone(), reactants, raw_reactants, vec![]);
let debug_struct_value = DebugStruct {
foo: 42,
bar: "test_value".to_owned(),
};
let debug_struct_arc = Arc::new(debug_struct_value);
let correlation_id = Uuid::now_v7();
let direct_neuron_encoder: NeuronImpl<DebugStruct, DebugCodec> =
NeuronImpl::new(ns.clone());
let encoded = direct_neuron_encoder
.encode(debug_struct_arc.as_ref())
.expect("Failed to encode");
let span_id = Uuid::now_v7().as_u128() as u64;
let _ = synapse
.transmit(Arc::new(PayloadRaw {
value: Arc::new(encoded.clone()),
neuron: neuron.clone(),
trace: TraceContext::from_parts(correlation_id, span_id, None),
}))
.await;
let p = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
.await
.expect("Timeout rx1")
.expect("Closed rx1");
assert_eq!(p.value, debug_struct_arc);
assert_eq!(p.correlation_id(), correlation_id);
assert_eq!(p.span_id(), span_id);
let p_raw = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
.await
.expect("Timeout raw_rx1")
.expect("Closed raw_rx1");
assert_eq!(p_raw.value.as_slice(), encoded.as_slice());
assert_eq!(p_raw.correlation_id(), correlation_id);
assert_eq!(p_raw.span_id(), span_id);
let debug_struct_2_value = DebugStruct {
foo: 49,
bar: "foo_bar".to_owned(),
};
let debug_struct_2_arc = Arc::new(debug_struct_2_value);
let correlation_id_2 = Uuid::now_v7();
let encoded_2 = direct_neuron_encoder
.encode(debug_struct_2_arc.as_ref())
.expect("Failed to encode");
let span_id_2 = Uuid::now_v7().as_u128() as u64;
let _ = synapse
.transmit(Arc::new(PayloadRaw {
value: Arc::new(encoded_2.clone()),
neuron: neuron.clone(),
trace: TraceContext::from_parts(correlation_id_2, span_id_2, None),
}))
.await;
let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
.await
.expect("Timeout rx2")
.expect("Closed rx2");
assert_eq!(p2.value, debug_struct_2_arc);
assert_eq!(p2.correlation_id(), correlation_id_2);
assert_eq!(p2.span_id(), span_id_2);
let p_raw2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
.await
.expect("Timeout raw_rx2")
.expect("Closed raw_rx2");
assert_eq!(p_raw2.value.as_slice(), encoded_2.as_slice());
assert_eq!(p_raw2.correlation_id(), correlation_id_2);
assert_eq!(p_raw2.span_id(), span_id_2);
}
}