pub use crate::codec::{Codec, CodecError, CodecName};
pub use crate::ganglion::GanglionInprocess;
pub use crate::neuron::NeuronImpl;
use crate::dendrite::DendriteDecoder;
use crate::erasure::payload::{PayloadErased, PayloadRawErased, SimplePayloadRawErased};
use crate::erasure::reactant::{
ErrorReactantErased, ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
};
use crate::erasure::synapse::{SynapseExternalErased, erase_synapse_external};
use crate::ganglion::{Ganglion, GanglionError, GanglionExternal};
use crate::namespace::NamespaceImpl;
use crate::neuron::Neuron;
use crate::payload::{Payload, PayloadRaw};
use crate::reactant::{ErrorReactant, Reactant, ReactantError, ReactantRaw};
use crate::synapse::{SynapseError, SynapseExternal};
use crate::utils::struct_name_of_type;
use regex::Regex;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use parking_lot::RwLock;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
use tracing::Instrument;
pub fn test_namespace() -> Arc<NamespaceImpl> {
Arc::new(NamespaceImpl {
delimiter: ".",
parts: vec!["dev", "plexo"],
})
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PingMsg {
pub seq: u64,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PongMsg {
pub seq: u64,
}
#[derive(Debug, Clone)]
pub struct PingCodec;
#[derive(Debug, Clone)]
pub struct PongCodec;
impl CodecName for PingCodec {
fn name() -> &'static str {
"ping"
}
}
impl Codec<PingMsg> for PingCodec {
fn encode(v: &PingMsg) -> Result<Vec<u8>, CodecError> {
Ok(format!("{v:?}").into_bytes())
}
fn decode(b: &[u8]) -> Result<PingMsg, CodecError> {
let s = String::from_utf8_lossy(b);
let re = Regex::new(r#"PingMsg \{ seq: (?P<seq>\d+) \}"#).unwrap();
if let Some(caps) = re.captures(&s) {
Ok(PingMsg {
seq: caps["seq"].parse().unwrap(),
})
} else {
Err(CodecError::Decode("fail".into()))
}
}
}
impl CodecName for PongCodec {
fn name() -> &'static str {
"pong"
}
}
impl Codec<PongMsg> for PongCodec {
fn encode(v: &PongMsg) -> Result<Vec<u8>, CodecError> {
Ok(format!("{v:?}").into_bytes())
}
fn decode(b: &[u8]) -> Result<PongMsg, CodecError> {
let s = String::from_utf8_lossy(b);
let re = Regex::new(r#"PongMsg \{ seq: (?P<seq>\d+) \}"#).unwrap();
if let Some(caps) = re.captures(&s) {
Ok(PongMsg {
seq: caps["seq"].parse().unwrap(),
})
} else {
Err(CodecError::Decode("fail".into()))
}
}
}
pub type PingNeuron = NeuronImpl<PingMsg, PingCodec>;
pub type PongNeuron = NeuronImpl<PongMsg, PongCodec>;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct DebugStruct {
pub foo: i32,
pub bar: String,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ResponseStruct {
pub ganglion_id: u32,
pub response_message: String,
}
#[derive(Debug, Clone)]
pub struct DebugCodec;
#[derive(Debug, Clone)]
pub struct ResponseCodec;
impl CodecName for DebugCodec {
fn name() -> &'static str {
"debug"
}
}
impl Codec<DebugStruct> for DebugCodec {
fn encode(value: &DebugStruct) -> Result<Vec<u8>, CodecError> {
Ok(format!("{value:?}").into_bytes())
}
fn decode(bytes: &[u8]) -> Result<DebugStruct, CodecError> {
let s = String::from_utf8_lossy(bytes);
let re = Regex::new(r#"DebugStruct \{ foo: (?P<foo>\d+), bar: "(?P<bar>[^"]+)" \}"#)
.map_err(|e| CodecError::Decode(format!("Failed to create regex: {e}")))?;
if let Some(caps) = re.captures(s.as_ref()) {
let val: i32 = caps
.name("foo")
.ok_or_else(|| CodecError::Decode("Missing 'foo' field".to_string()))?
.as_str()
.parse()
.map_err(|e| CodecError::Decode(format!("Failed to parse foo: {e}")))?;
let bar = caps
.name("bar")
.ok_or_else(|| CodecError::Decode("Missing 'bar' field".to_string()))?
.as_str()
.to_string();
Ok(DebugStruct { foo: val, bar })
} else {
Err(CodecError::Decode(
"Failed to match debug struct pattern".to_string(),
))
}
}
}
impl CodecName for ResponseCodec {
fn name() -> &'static str {
"response"
}
}
impl Codec<ResponseStruct> for ResponseCodec {
fn encode(value: &ResponseStruct) -> Result<Vec<u8>, CodecError> {
Ok(format!("{value:?}").into_bytes())
}
fn decode(bytes: &[u8]) -> Result<ResponseStruct, CodecError> {
let s = String::from_utf8_lossy(bytes);
let re = Regex::new(r#"ResponseStruct \{ ganglion_id: (?P<ganglion_id>\d+), response_message: "(?P<response_message>[^"]+)" \}"#)
.map_err(|e| CodecError::Decode(format!("Failed to create regex: {e}")))?;
if let Some(caps) = re.captures(s.as_ref()) {
let ganglion_id: u32 = caps
.name("ganglion_id")
.ok_or_else(|| CodecError::Decode("Missing 'ganglion_id' field".to_string()))?
.as_str()
.parse()
.map_err(|e| CodecError::Decode(format!("Failed to parse ganglion_id: {e}")))?;
let response_message = caps
.name("response_message")
.ok_or_else(|| CodecError::Decode("Missing 'response_message' field".to_string()))?
.as_str()
.to_string();
Ok(ResponseStruct {
ganglion_id,
response_message,
})
} else {
Err(CodecError::Decode(
"Failed to match response struct pattern".to_string(),
))
}
}
}
#[allow(unused)]
#[derive(Clone)]
pub struct NoopReactant;
impl<C> Reactant<DebugStruct, C> for NoopReactant
where
C: Codec<DebugStruct> + CodecName + Send + Sync + 'static,
{
fn react(
&self,
_p: Arc<Payload<DebugStruct, C>>,
) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
Box::pin(async move { Ok(()) })
}
fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
erase_reactant::<DebugStruct, C, NoopReactant>(self)
}
}
impl<C> ReactantRaw<DebugStruct, C> for NoopReactant
where
C: Codec<DebugStruct> + CodecName + Send + Sync + 'static,
{
fn react(
&self,
_p: Arc<PayloadRaw<DebugStruct, C>>,
) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
Box::pin(async move { Ok(()) })
}
fn erase_raw(self: Box<Self>) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
erase_reactant_raw::<DebugStruct, C, NoopReactant>(self)
}
}
#[allow(unused)]
#[derive(Clone)]
pub struct NoopReactant2;
impl<C> Reactant<DebugStruct, C> for NoopReactant2
where
C: Codec<DebugStruct> + CodecName + Send + Sync + 'static,
{
fn react(
&self,
_p: Arc<Payload<DebugStruct, C>>,
) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
Box::pin(async move { Ok(()) })
}
fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
erase_reactant::<DebugStruct, C, NoopReactant2>(self)
}
}
impl<C> ReactantRaw<DebugStruct, C> for NoopReactant2
where
C: Codec<DebugStruct> + CodecName + Send + Sync + 'static,
{
fn react(
&self,
_p: Arc<PayloadRaw<DebugStruct, C>>,
) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
Box::pin(async move { Ok(()) })
}
fn erase_raw(self: Box<Self>) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
erase_reactant_raw::<DebugStruct, C, NoopReactant2>(self)
}
}
#[derive(Clone)]
pub struct TokioMpscReactant {
pub sender: Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
}
impl TokioMpscReactant {
pub fn new(sender: Sender<Arc<Payload<DebugStruct, DebugCodec>>>) -> Self {
Self { sender }
}
}
impl Reactant<DebugStruct, DebugCodec> for TokioMpscReactant {
fn react(
&self,
p: Arc<Payload<DebugStruct, DebugCodec>>,
) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
let sender = self.sender.clone();
tracing::debug!(val = ?p.value, "TokioMpscReactant::react called");
Box::pin(async move {
tracing::debug!("TokioMpscReactant::react sending to channel");
if let Err(e) = sender.try_send(p) {
tracing::warn!("TokioMpscReactant failed to send payload: {e}");
} else {
tracing::debug!("TokioMpscReactant sent to channel successfully");
}
Ok(())
})
}
fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
erase_reactant::<DebugStruct, DebugCodec, TokioMpscReactant>(self)
}
}
pub struct TokioMpscReactantGeneric<T, C> {
pub sender: Sender<Arc<Payload<T, C>>>,
_phantom: PhantomData<(T, C)>,
}
impl<T, C> Clone for TokioMpscReactantGeneric<T, C> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
_phantom: PhantomData,
}
}
}
impl<T, C> TokioMpscReactantGeneric<T, C> {
pub fn new(sender: Sender<Arc<Payload<T, C>>>) -> Self {
Self {
sender,
_phantom: PhantomData,
}
}
}
impl<T, C> Reactant<T, C> for TokioMpscReactantGeneric<T, C>
where
T: Send + Sync + Clone + 'static,
C: Codec<T> + CodecName + Send + Sync + Clone + 'static,
{
fn react(
&self,
p: Arc<Payload<T, C>>,
) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
let tx = self.sender.clone();
tracing::debug!("TokioMpscReactantGeneric::react called");
Box::pin(async move {
if let Err(e) = tx.try_send(p) {
tracing::warn!("TokioMpscReactantGeneric failed to send payload: {e}");
}
Ok(())
})
}
fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
erase_reactant::<T, C, _>(self)
}
}
#[derive(Clone)]
pub struct TokioMpscReactantRaw {
pub sender: Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>,
}
impl TokioMpscReactantRaw {
pub fn new(sender: Sender<Arc<PayloadRaw<DebugStruct, DebugCodec>>>) -> Self {
Self { sender }
}
}
impl ReactantRaw<DebugStruct, DebugCodec> for TokioMpscReactantRaw {
fn react(
&self,
p: Arc<PayloadRaw<DebugStruct, DebugCodec>>,
) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
let sender = self.sender.clone();
tracing::debug!("TokioMpscReactantRaw::react called");
Box::pin(async move {
tracing::debug!("TokioMpscReactantRaw::react sending to channel");
if let Err(e) = sender.try_send(p) {
tracing::warn!("TokioMpscReactantRaw failed to send payload: {e}");
} else {
tracing::debug!("TokioMpscReactantRaw sent to channel successfully");
}
Ok(())
})
}
fn erase_raw(self: Box<Self>) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
erase_reactant_raw::<DebugStruct, DebugCodec, _>(self)
}
}
pub struct SynapseExternalInprocess<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
dendrite_decoder: Option<Arc<DendriteDecoder<T, C>>>,
_phantom_t: PhantomData<T>,
}
impl<T, C> SynapseExternalInprocess<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
pub fn new(
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
) -> Self {
let dendrite_decoder =
if !reactants.is_empty() || !raw_reactants.is_empty() || !error_reactants.is_empty() {
Some(Arc::new(DendriteDecoder::new(
neuron.clone(),
reactants,
raw_reactants,
error_reactants,
None,
)))
} else {
None
};
Self {
neuron,
dendrite_decoder,
_phantom_t: PhantomData,
}
}
}
impl<T, C> SynapseExternal<T, C> for SynapseExternalInprocess<T, C>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
self.neuron.clone()
}
fn transduce(
&self,
payload: Arc<PayloadRaw<T, C>>,
) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
let decoder = self.dendrite_decoder.clone();
Box::pin(async move {
match decoder {
Some(decoder) => {
decoder
.transduce(payload)
.await
.map_err(SynapseError::from)
}
None => Ok((vec![], vec![])),
}
})
}
fn transmit(
&self,
payload: Arc<PayloadRaw<T, C>>,
) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
self.transduce(payload)
}
fn react(
&mut self,
reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
) -> Result<(), SynapseError> {
if reactants.is_empty() && raw_reactants.is_empty() && error_reactants.is_empty() {
return Ok(());
}
match &self.dendrite_decoder {
Some(decoder) => {
if !reactants.is_empty() {
let _ = decoder.add_reactants(reactants);
}
if !raw_reactants.is_empty() {
let _ = decoder.add_raw_reactants(raw_reactants);
}
if !error_reactants.is_empty() {
let _ = decoder.add_error_reactants(error_reactants);
}
}
None => {
self.dendrite_decoder = Some(Arc::new(DendriteDecoder::new(
self.neuron.clone(),
reactants,
raw_reactants,
error_reactants,
None,
)));
}
}
Ok(())
}
}
pub struct GanglionExternalInprocess {
id: Uuid,
synapses_by_name:
HashMap<String, Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>>,
relevant_neurons: HashSet<String>,
ignored_neurons: HashSet<String>,
}
impl Default for GanglionExternalInprocess {
fn default() -> Self {
Self::new()
}
}
impl GanglionExternalInprocess {
pub fn new() -> Self {
Self {
id: Uuid::now_v7(),
synapses_by_name: HashMap::new(),
relevant_neurons: HashSet::new(),
ignored_neurons: HashSet::new(),
}
}
pub fn new_with_filters(
relevant_neurons: HashSet<String>,
ignored_neurons: HashSet<String>,
) -> Self {
Self {
id: Uuid::now_v7(),
synapses_by_name: HashMap::new(),
relevant_neurons,
ignored_neurons,
}
}
#[allow(dead_code)]
pub fn add_synapse(
&mut self,
name: String,
synapse: SynapseExternalInprocess<DebugStruct, DebugCodec>,
) {
let erased_synapse = erase_synapse_external(synapse);
self.synapses_by_name.insert(name, erased_synapse);
}
pub fn create_synapse<T, C>(
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
error_reactants: Vec<Arc<dyn crate::reactant::ErrorReactant<T, C> + Send + Sync>>,
) -> SynapseExternalInprocess<T, C>
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
SynapseExternalInprocess<T, C>: Send + Sync + 'static,
{
SynapseExternalInprocess::<T, C>::new(neuron, reactants, raw_reactants, error_reactants)
}
#[allow(dead_code)]
pub fn populate_synapse<T, C>(
&mut self,
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
) -> Result<(), String>
where
T: Send + Sync + 'static,
C: Codec<T> + CodecName + Send + Sync + 'static,
{
let name = neuron.name();
if self.synapses_by_name.contains_key(&name) {
return Err(format!("Synapse with name '{name}' already exists"));
}
let synapse =
SynapseExternalInprocess::<T, C>::new(neuron, reactants, raw_reactants, vec![]);
let erased_synapse = erase_synapse_external(synapse);
self.synapses_by_name.insert(name, erased_synapse);
Ok(())
}
fn get_synapse_by_name(
&self,
name: &str,
) -> Option<Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>> {
self.synapses_by_name.get(name).cloned()
}
}
impl Ganglion for GanglionExternalInprocess {
fn capable<T, C>(&mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let neuron_name = neuron.name();
if !self.relevant_neurons.is_empty() && !self.relevant_neurons.contains(&neuron_name) {
return false;
}
if !self.ignored_neurons.is_empty() && self.ignored_neurons.contains(&neuron_name) {
return false;
}
true
}
fn adapt<T, C>(
&mut self,
neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>>
where
C: Codec<T> + CodecName + Send + Sync + 'static,
T: Send + Sync + 'static,
{
if !self.capable(neuron.clone()) {
return Box::pin(async move { Ok(()) });
}
let neuron_name = neuron.name();
if self.synapses_by_name.contains_key(&neuron_name) {
return Box::pin(async move {
Ok(()) });
}
let synapse = Self::create_synapse(neuron, vec![], vec![], vec![]);
let erased_synapse = erase_synapse_external(synapse);
self.synapses_by_name
.insert(neuron_name, erased_synapse);
Box::pin(async move { Ok(()) })
}
}
impl GanglionExternal for GanglionExternalInprocess {
fn transmit(
&mut self,
payload: Arc<dyn PayloadErased + Send + Sync + 'static>,
) -> Pin<Box<dyn Future<Output = Result<Vec<()>, GanglionError>> + Send + 'static>> {
let neuron_name = payload.get_neuron_name();
tracing::debug!(neuron = %neuron_name, "GanglionExternalInprocess::transmit called");
if let Some(synapse_lock) = self.get_synapse_by_name(&neuron_name) {
tracing::debug!(neuron = %neuron_name, "GanglionExternalInprocess::transmit found synapse");
let synapse_lock_clone = synapse_lock.clone();
let erased_neuron = payload.get_erased_neuron();
let value_any = payload.get_value();
let ganglion_id = self.id;
let ganglion_name = struct_name_of_type::<Self>().to_string();
let encode_res = erased_neuron.encode_any(value_any);
let payload_clone = payload.clone();
let payload_inner = payload_clone.clone();
Box::pin(
async move {
tracing::debug!("GanglionExternalInprocess::transmit - starting async block");
let encoded_value = match encode_res {
Ok(encoded) => encoded,
Err(e) => {
tracing::error!("Error encoding payload value: {e}");
return Err(GanglionError::Encode {
neuron_name,
ganglion_name,
ganglion_id,
});
}
};
let (p_type_id, c_type_id) = {
tracing::debug!("GanglionExternalInprocess::transmit - acquiring synapse read lock for types");
let guard = synapse_lock_clone.read();
(guard.payload_type_id(), guard.codec_type_id())
};
let payload_raw_erased = Arc::new(SimplePayloadRawErased {
bytes: Arc::new(encoded_value),
neuron_name: neuron_name.clone(),
trace: payload_inner.get_trace_context(),
payload_type_id: p_type_id,
codec_type_id: c_type_id,
});
let future = {
tracing::debug!("GanglionExternalInprocess::transmit - acquiring synapse read lock for transmit");
let synapse_guard = synapse_lock_clone.read();
synapse_guard.transmit_erased(payload_raw_erased)
};
tracing::debug!("GanglionExternalInprocess::transmit - awaiting transmit_erased future");
let res = future.await.map(|r| r.0).map_err(|e| match e {
crate::synapse::SynapseError::QueueFull { neuron_name: _ } => {
GanglionError::QueueFull {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
}
}
_ => GanglionError::Transmit {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
message: e.to_string(),
},
});
tracing::debug!("GanglionExternalInprocess::transmit - finished");
res
}
.instrument(payload_clone.span_debug("GanglionExternalInprocess::transmit")),
)
} else {
tracing::debug!(neuron = %neuron_name, "GanglionExternalInprocess::transmit synapse not found");
let ganglion_id = self.id;
let ganglion_name = struct_name_of_type::<Self>().to_string();
Box::pin(async move {
Err(GanglionError::SynapseNotFound {
neuron_name,
ganglion_name,
ganglion_id,
})
})
}
}
fn transmit_encoded(
&mut self,
payload: Arc<dyn PayloadRawErased + Send + Sync + 'static>,
) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), GanglionError>> + Send + 'static>>
{
let neuron_name = payload.get_neuron_name();
tracing::debug!(neuron = %neuron_name, "GanglionExternalInprocess::transmit_encoded called");
if let Some(synapse_lock) = self.get_synapse_by_name(&neuron_name) {
tracing::debug!(neuron = %neuron_name, "GanglionExternalInprocess::transmit_encoded found synapse");
let synapse_lock_clone = synapse_lock.clone();
let ganglion_id = self.id;
let ganglion_name = struct_name_of_type::<GanglionExternalInprocess>().to_string();
Box::pin(async move {
tracing::debug!("GanglionExternalInprocess::transmit_encoded - starting async block");
let future = {
tracing::debug!("GanglionExternalInprocess::transmit_encoded - acquiring synapse read lock");
let synapse_guard = synapse_lock_clone.read();
synapse_guard.transmit_erased(payload)
};
tracing::debug!("GanglionExternalInprocess::transmit_encoded - awaiting transmit_erased future");
let res = future.await.map_err(|e| match e {
crate::synapse::SynapseError::QueueFull { neuron_name: _ } => {
GanglionError::QueueFull {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
}
}
_ => GanglionError::Transmit {
neuron_name: neuron_name.clone(),
ganglion_name: ganglion_name.clone(),
ganglion_id,
message: e.to_string(),
},
});
tracing::debug!("GanglionExternalInprocess::transmit_encoded - finished");
res
})
} else {
tracing::debug!(neuron = %neuron_name, "GanglionExternalInprocess::transmit_encoded synapse not found");
let ganglion_id = self.id;
let ganglion_name = struct_name_of_type::<GanglionExternalInprocess>().to_string();
Box::pin(async move {
Err(GanglionError::SynapseNotFound {
neuron_name,
ganglion_name,
ganglion_id,
})
})
}
}
fn react(
&mut self,
neuron_name: String,
reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
raw_reactants: Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
error_reactants: Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
let synapse_lock_opt = self.get_synapse_by_name(&neuron_name);
if synapse_lock_opt.is_none() {
let ganglion_id = self.id;
let ganglion_name = struct_name_of_type::<GanglionExternalInprocess>().to_string();
return Box::pin(async move {
Err(GanglionError::SynapseNotFound {
neuron_name,
ganglion_name,
ganglion_id,
})
});
}
let synapse_lock = synapse_lock_opt.unwrap();
let mut synapse_guard = synapse_lock.write();
synapse_guard.react_erased(reactants, raw_reactants, error_reactants);
Box::pin(async move { Ok(()) })
}
fn react_many(
&mut self,
reactions: HashMap<
String,
(
Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
Vec<Arc<dyn ReactantRawErased + Send + Sync + 'static>>,
Vec<Arc<dyn ErrorReactantErased + Send + Sync>>,
),
>,
) -> Pin<Box<dyn Future<Output = Result<(), GanglionError>> + Send + 'static>> {
for (name, (rs, rrs, ers)) in reactions {
if let Some(synapse_lock) = self.get_synapse_by_name(&name) {
let mut synapse_guard = synapse_lock.write();
synapse_guard.react_erased(rs, rrs, ers);
} else {
let ganglion_id = self.id;
let ganglion_name = struct_name_of_type::<GanglionExternalInprocess>().to_string();
return Box::pin(async move {
Err(GanglionError::SynapseNotFound {
neuron_name: name,
ganglion_name,
ganglion_id,
})
});
}
}
Box::pin(async move { Ok(()) })
}
fn unique_id(&self) -> Uuid {
self.id
}
}