1use crate::backpressure::{BackpressureConfig, BackpressureQueue};
8use crate::codec::{Codec, CodecName};
9use crate::dendrite::{Dendrite, DendriteError};
10use crate::erasure::neuron::{NeuronErased, NeuronErasedWrapper};
11use crate::erasure::reactant::ReactantErased;
12use crate::logging::LogTrace;
13use crate::neuron::Neuron;
14use crate::payload::{Payload, PayloadRaw};
15use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
16use std::future::Future;
17use std::marker::PhantomData;
18use std::pin::Pin;
19use std::sync::Arc;
20use parking_lot::RwLock;
21use thiserror::Error;
22
23#[derive(Error, Debug)]
24pub enum SynapseError {
25 #[error("Queue for neuron '{neuron_name}' is full")]
26 QueueFull { neuron_name: String },
27 #[error(transparent)]
28 Dendrite(#[from] DendriteError),
29 #[error("Type conversion failed for neuron '{neuron_name}'")]
30 NeuronTypeConversion { neuron_name: String },
31 #[error("Type conversion failed for reactant in neuron '{neuron_name}'")]
32 ReactantTypeConversion { neuron_name: String },
33 #[error("No dendrite available for processing in neuron '{neuron_name}'")]
34 NoDendrite { neuron_name: String },
35}
36
37use tracing::Instrument;
38
39pub trait SynapseInternal<T, C>
40where
41 C: Codec<T> + CodecName + Send + Sync + 'static,
42 T: Send + Sync + 'static,
43{
44 fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync>;
45 fn transduce(
46 &self,
47 payload: Arc<Payload<T, C>>,
48 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
49 fn transmit(
50 &self,
51 payload: Arc<Payload<T, C>>,
52 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
53 fn react(
54 &mut self,
55 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
56 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
57 ) -> Result<(), SynapseError>;
58}
59
60pub trait SynapseExternal<T, C>
61where
62 C: Codec<T> + CodecName + Send + Sync + 'static,
63 T: Send + Sync + 'static,
64{
65 fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync>;
66 fn transduce(
67 &self,
68 payload: Arc<PayloadRaw<T, C>>,
69 ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static;
70 fn transmit(
71 &self,
72 payload: Arc<PayloadRaw<T, C>>,
73 ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static;
74 fn react(
75 &mut self,
76 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
77 raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
78 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
79 ) -> Result<(), SynapseError>;
80}
81
82pub trait RawSender: Send + Sync {
84 fn send(
85 &self,
86 topic: &str,
87 data: Vec<u8>,
88 ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
89}
90
91pub struct BackpressureSender<S: RawSender> {
93 queue: Arc<BackpressureQueue<(String, Vec<u8>)>>,
94 _marker: PhantomData<S>,
95}
96
97impl<S: RawSender + 'static> BackpressureSender<S> {
98 pub fn new(inner: S, config: BackpressureConfig, neuron_name: String) -> Self {
99 let inner_arc = Arc::new(inner);
100 let inner_clone = inner_arc.clone();
101
102 let queue = BackpressureQueue::<(String, Vec<u8>)>::new(
103 neuron_name,
104 config,
105 move |(topic, data)| {
106 let s = inner_clone.clone();
107 async move {
108 if let Err(e) = s.send(&topic, data).await {
109 eprintln!("BackpressureSender failed to send: {}", e);
110 }
111 }
112 },
113 );
114
115 Self {
116 queue: Arc::new(queue),
117 _marker: PhantomData,
118 }
119 }
120}
121
122impl<S: RawSender + 'static> RawSender for BackpressureSender<S> {
123 fn send(
124 &self,
125 topic: &str,
126 data: Vec<u8>,
127 ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> {
128 let q = self.queue.clone();
129 let topic = topic.to_string();
130 Box::pin(async move {
131 q.push((topic, data)).await.map_err(|e| e.to_string())
132 })
133 }
134}
135
136pub struct BackpressureExternalSynapse<T, C, S>
138where
139 C: Codec<T> + CodecName + Send + Sync + 'static,
140 T: Send + Sync + 'static,
141 S: SynapseExternal<T, C> + Send + Sync + 'static,
142{
143 inner: Arc<RwLock<S>>,
144 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
145 queue: Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>>,
146 _phantom: PhantomData<(T, C)>,
147}
148
149impl<T, C, S> BackpressureExternalSynapse<T, C, S>
150where
151 C: Codec<T> + CodecName + Send + Sync + 'static,
152 T: Send + Sync + 'static,
153 S: SynapseExternal<T, C> + Send + Sync + 'static,
154{
155 pub fn new(synapse: S, config: BackpressureConfig) -> Self {
156 let neuron = synapse.neuron();
157 let neuron_name = neuron.name();
158 let synapse_arc = Arc::new(RwLock::new(synapse));
159 let inner_clone = synapse_arc.clone();
160
161 let queue = BackpressureQueue::new(
162 neuron_name,
163 config,
164 move |payload: Arc<PayloadRaw<T, C>>| {
165 let s = inner_clone.clone();
166 async move {
167 let future = {
168 let guard = s.read();
169 guard.transmit(payload)
170 };
171 let _ = future.await;
172 }
173 },
174 );
175
176 Self {
177 inner: synapse_arc,
178 neuron,
179 queue: Arc::new(queue),
180 _phantom: PhantomData,
181 }
182 }
183}
184
185impl<T, C, S> SynapseExternal<T, C> for BackpressureExternalSynapse<T, C, S>
186where
187 C: Codec<T> + CodecName + Send + Sync + 'static,
188 T: Send + Sync + 'static,
189 S: SynapseExternal<T, C> + Send + Sync + 'static,
190{
191 fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
192 self.neuron.clone()
193 }
194
195 fn transduce(
196 &self,
197 payload: Arc<PayloadRaw<T, C>>,
198 ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
199 self.transmit(payload)
200 }
201
202 fn transmit(
203 &self,
204 payload: Arc<PayloadRaw<T, C>>,
205 ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
206 let q = self.queue.clone();
207 Box::pin(async move {
208 q.push(payload).await?;
209 Ok((vec![], vec![]))
210 })
211 }
212
213 fn react(
214 &mut self,
215 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
216 raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
217 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
218 ) -> Result<(), SynapseError> {
219 let mut guard = self.inner.write();
220 guard.react(reactants, raw_reactants, error_reactants)
221 }
222}
223
224pub struct SynapseInprocess<T, C>
225where
226 C: Codec<T> + CodecName + Send + Sync + 'static,
227 T: Sync + Send + 'static,
228{
229 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
230 dendrite: Option<Dendrite<T, C>>,
231 _codec_marker: PhantomData<fn() -> &'static ()>,
232}
233
234impl<T, C> SynapseInprocess<T, C>
235where
236 C: Codec<T> + CodecName + Send + Sync + 'static,
237 T: Sync + Send + 'static,
238{
239 pub fn new(
240 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
241 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
242 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
243 ) -> Self {
244 let dendrite = if !reactants.is_empty() || !error_reactants.is_empty() {
245 Some(Dendrite::new(neuron.clone(), reactants, error_reactants))
246 } else {
247 None
248 };
249 Self {
250 neuron,
251 dendrite,
252 _codec_marker: PhantomData,
253 }
254 }
255
256 pub fn from_erased(
259 neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
260 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
261 ) -> Option<Self>
262 where
263 T: 'static,
264 C: 'static,
265 {
266 use std::any::TypeId;
267
268 let neuron_type_id = neuron.payload_type_id();
270 let codec_type_id = neuron.codec_type_id();
271
272 if neuron_type_id != TypeId::of::<T>() || codec_type_id != TypeId::of::<C>() {
273 return None;
274 }
275
276 let typed_neuron = match neuron.as_any().downcast_ref::<NeuronErasedWrapper<T, C>>() {
278 Some(wrapper) => wrapper.get_typed_neuron(),
279 None => return None,
280 };
281
282 let typed_reactants: Vec<_> = reactants
284 .into_iter()
285 .filter_map(|erased_reactant| {
286 if erased_reactant.payload_type_id() != TypeId::of::<T>()
288 || erased_reactant.codec_type_id() != TypeId::of::<C>()
289 {
290 return None;
291 }
292
293 let any_arc = erased_reactant.clone_to_any();
295 any_arc
296 .downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
297 .ok()
298 .map(|boxed_arc| (*boxed_arc).clone())
299 })
300 .collect();
301
302 Some(Self::new(typed_neuron.clone(), typed_reactants, vec![]))
304 }
305}
306
307impl<T, C> SynapseInternal<T, C> for SynapseInprocess<T, C>
308where
309 C: Codec<T> + CodecName + Send + Sync + 'static,
310 T: Sync + Send + 'static,
311{
312 fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
313 self.neuron.clone()
314 }
315
316 fn transduce(
317 &self,
318 payload: Arc<Payload<T, C>>,
319 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
320 let span = payload.span_debug("SynapseInprocess::transduce");
321 match &self.dendrite {
322 Some(dendrite) => {
323 let future = dendrite.transduce(payload);
324 Box::pin(
325 async move {
326 tracing::debug!("SynapseInprocess::transduce calling dendrite.transduce");
327 future.await.map_err(SynapseError::from)
328 }
329 .instrument(span),
330 )
331 }
332 None => Box::pin(
333 async move {
334 tracing::debug!("SynapseInprocess::transduce no dendrite, returning empty vec");
335 Ok(vec![])
336 }
337 .instrument(span),
338 ),
339 }
340 }
341
342 fn transmit(
343 &self,
344 payload: Arc<Payload<T, C>>,
345 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
346 let span = payload.span_debug("SynapseInprocess::transmit");
347 let future = self.transduce(payload);
349 Box::pin(
350 async move {
351 tracing::debug!("SynapseInprocess::transmit called");
352 future.await
353 }
354 .instrument(span),
355 )
356 }
357
358 fn react(
359 &mut self,
360 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
361 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
362 ) -> Result<(), SynapseError> {
363 if reactants.is_empty() && error_reactants.is_empty() {
364 return Ok(());
365 }
366
367 match &self.dendrite {
368 Some(dendrite) => {
369 dendrite.add_reactants(reactants)?;
371 dendrite.add_error_reactants(error_reactants)?;
372 }
373 None => {
374 self.dendrite = Some(Dendrite::new(
376 self.neuron.clone(),
377 reactants,
378 error_reactants,
379 ));
380 }
381 }
382 Ok(())
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use crate::logging::TraceContext;
390 use crate::neuron::NeuronImpl;
391 use crate::payload::PayloadRaw;
392 use crate::test_utils::{
393 DebugCodec, DebugStruct, SynapseExternalInprocess, TokioMpscReactant, TokioMpscReactantRaw,
394 test_namespace,
395 };
396 use tokio::sync::mpsc::channel;
397 use uuid::Uuid;
398
399 #[tokio::test]
400 async fn test_synapse_inprocess_transmit() {
401 let ns = test_namespace();
402
403 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
404
405 let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> = vec![
406 Arc::new(TokioMpscReactant { sender: tx.clone() }),
407 Arc::new(TokioMpscReactant { sender: tx.clone() }),
408 ];
409 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
410 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
411 let synapse = SynapseInprocess::new(neuron.clone(), reactants, vec![]);
412
413 let debug_struct = Arc::new(DebugStruct {
414 foo: 42,
415 bar: "test_value".to_owned(),
416 });
417 let correlation_id = Uuid::now_v7();
418 let span_id = Uuid::now_v7().as_u128() as u64;
419 let _ = synapse
420 .transmit(
421 Payload::builder()
422 .value((*debug_struct).clone())
423 .correlation_id(correlation_id)
424 .neuron(neuron.clone())
425 .span_id(span_id)
426 .build()
427 .unwrap(),
428 )
429 .await;
430
431 assert_eq!(rx.len(), 2);
432
433 let p = rx.recv().await.unwrap();
434 assert_eq!(p.value, debug_struct);
435 assert_eq!(p.correlation_id(), correlation_id);
436 assert_eq!(p.span_id(), span_id);
437 assert_eq!(rx.len(), 1);
438 let p2 = rx.recv().await.unwrap();
439 assert_eq!(p2.value, debug_struct);
440 assert_eq!(p2.correlation_id(), correlation_id);
441 assert_eq!(p2.span_id(), span_id);
442 assert_eq!(rx.len(), 0);
443
444 let debug_struct_2 = Arc::new(DebugStruct {
445 foo: 49,
446 bar: "foo_bar".to_owned(),
447 });
448 let correlation_id_2 = Uuid::now_v7();
449 let span_id_2 = Uuid::now_v7().as_u128() as u64;
450 let _ = synapse
451 .transmit(
452 Payload::builder()
453 .value((*debug_struct_2).clone())
454 .correlation_id(correlation_id_2)
455 .neuron(neuron.clone())
456 .span_id(span_id_2)
457 .build()
458 .unwrap(),
459 )
460 .await;
461
462 let p3 = rx.recv().await.unwrap();
463 assert_eq!(p3.value, debug_struct_2);
464 assert_eq!(p3.correlation_id(), correlation_id_2);
465 assert_eq!(p3.span_id(), span_id_2);
466 assert_eq!(rx.len(), 1);
467 let p4 = rx.recv().await.unwrap();
468 assert_eq!(p4.value, debug_struct_2);
469 assert_eq!(p4.correlation_id(), correlation_id_2);
470 assert_eq!(p4.span_id(), span_id_2);
471 assert_eq!(rx.len(), 0);
472 }
473
474 #[tokio::test]
475 async fn test_synapse_inprocess_with_none_reactants() {
476 let ns = test_namespace();
477 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
478 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
479
480 let synapse = SynapseInprocess::new(neuron.clone(), vec![], vec![]);
482
483 let debug_struct = Arc::new(DebugStruct {
484 foo: 42,
485 bar: "test_value".to_owned(),
486 });
487 let correlation_id = Uuid::now_v7();
488 let span_id = Uuid::now_v7().as_u128() as u64;
489
490 let result = synapse
492 .transmit(
493 Payload::builder()
494 .value((*debug_struct).clone())
495 .correlation_id(correlation_id)
496 .neuron(neuron.clone())
497 .span_id(span_id)
498 .build()
499 .unwrap(),
500 )
501 .await;
502
503 assert_eq!(
504 result.expect("Should succeed").len(),
505 0,
506 "Should return empty vector when dendrite is None"
507 );
508 }
509
510 #[tokio::test]
511 async fn test_synapse_external_with_none_reactants() {
512 let ns = test_namespace();
513 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
514 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
515 Arc::new(neuron_impl.clone());
516
517 let synapse = SynapseExternalInprocess::new(neuron.clone(), vec![], vec![], vec![]);
519
520 let debug_struct_value = DebugStruct {
521 foo: 42,
522 bar: "test_value".to_owned(),
523 };
524 let debug_struct_arc = Arc::new(debug_struct_value);
525 let correlation_id = Uuid::now_v7();
526 let direct_neuron_encoder: NeuronImpl<DebugStruct, DebugCodec> =
527 NeuronImpl::new(ns.clone());
528 let encoded = direct_neuron_encoder
529 .encode(debug_struct_arc.as_ref())
530 .expect("Failed to encode");
531 let span_id = Uuid::now_v7().as_u128() as u64;
532
533 let result = synapse
535 .transmit(Arc::new(PayloadRaw {
536 value: Arc::new(encoded.clone()),
537 neuron: neuron.clone(),
538 trace: TraceContext::from_parts(correlation_id, span_id, None),
539 }))
540 .await
541 .expect("Should succeed");
542
543 assert_eq!(
544 result.0.len(),
545 0,
546 "Should return empty vector for reactants when dendrite_decoder is None"
547 );
548 assert_eq!(
549 result.1.len(),
550 0,
551 "Should return empty vector for raw_reactants when dendrite_decoder is None"
552 );
553 }
554
555 #[tokio::test]
556 async fn test_synapse_external_inprocess_transmit() {
557 let ns = test_namespace();
558
559 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
560 let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
561
562 let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
563 vec![Arc::new(TokioMpscReactant { sender: tx.clone() })];
564 let raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
565 vec![Arc::new(TokioMpscReactantRaw {
566 sender: tx_raw.clone(),
567 })];
568 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
569 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
570 Arc::new(neuron_impl.clone());
571 let synapse =
572 SynapseExternalInprocess::new(neuron.clone(), reactants, raw_reactants, vec![]);
573
574 let debug_struct_value = DebugStruct {
575 foo: 42,
576 bar: "test_value".to_owned(),
577 };
578 let debug_struct_arc = Arc::new(debug_struct_value);
579
580 let correlation_id = Uuid::now_v7();
581 let direct_neuron_encoder: NeuronImpl<DebugStruct, DebugCodec> =
582 NeuronImpl::new(ns.clone());
583 let encoded = direct_neuron_encoder
584 .encode(debug_struct_arc.as_ref())
585 .expect("Failed to encode");
586
587 let span_id = Uuid::now_v7().as_u128() as u64;
588
589 let _ = synapse
590 .transmit(Arc::new(PayloadRaw {
591 value: Arc::new(encoded.clone()),
592 neuron: neuron.clone(),
593 trace: TraceContext::from_parts(correlation_id, span_id, None),
594 }))
595 .await;
596
597 let p = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
598 .await
599 .expect("Timeout rx1")
600 .expect("Closed rx1");
601 assert_eq!(p.value, debug_struct_arc);
602 assert_eq!(p.correlation_id(), correlation_id);
603 assert_eq!(p.span_id(), span_id);
604
605 let p_raw = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
606 .await
607 .expect("Timeout raw_rx1")
608 .expect("Closed raw_rx1");
609 assert_eq!(p_raw.value.as_slice(), encoded.as_slice());
610 assert_eq!(p_raw.correlation_id(), correlation_id);
611 assert_eq!(p_raw.span_id(), span_id);
612
613 let debug_struct_2_value = DebugStruct {
614 foo: 49,
615 bar: "foo_bar".to_owned(),
616 };
617 let debug_struct_2_arc = Arc::new(debug_struct_2_value);
618 let correlation_id_2 = Uuid::now_v7();
619 let encoded_2 = direct_neuron_encoder
620 .encode(debug_struct_2_arc.as_ref())
621 .expect("Failed to encode");
622 let span_id_2 = Uuid::now_v7().as_u128() as u64;
623 let _ = synapse
624 .transmit(Arc::new(PayloadRaw {
625 value: Arc::new(encoded_2.clone()),
626 neuron: neuron.clone(),
627 trace: TraceContext::from_parts(correlation_id_2, span_id_2, None),
628 }))
629 .await;
630
631 let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
632 .await
633 .expect("Timeout rx2")
634 .expect("Closed rx2");
635 assert_eq!(p2.value, debug_struct_2_arc);
636 assert_eq!(p2.correlation_id(), correlation_id_2);
637 assert_eq!(p2.span_id(), span_id_2);
638
639 let p_raw2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
640 .await
641 .expect("Timeout raw_rx2")
642 .expect("Closed raw_rx2");
643 assert_eq!(p_raw2.value.as_slice(), encoded_2.as_slice());
644 assert_eq!(p_raw2.correlation_id(), correlation_id_2);
645 assert_eq!(p_raw2.span_id(), span_id_2);
646 }
647}