Skip to main content

plexor_core/
synapse.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::codec::{Codec, CodecName};
8use crate::dendrite::{Dendrite, DendriteError};
9use crate::erasure::neuron::{NeuronErased, NeuronErasedWrapper};
10use crate::erasure::reactant::ReactantErased;
11use crate::logging::LogTrace;
12use crate::neuron::Neuron;
13use crate::payload::{Payload, PayloadRaw};
14use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
15use std::collections::VecDeque;
16use std::future::Future;
17use std::marker::PhantomData;
18use std::pin::Pin;
19use std::sync::{Arc, Mutex as StdMutex, RwLock};
20use thiserror::Error;
21
22#[derive(Error, Debug)]
23pub enum SynapseError {
24    #[error("Queue for neuron '{neuron_name}' is full")]
25    QueueFull { neuron_name: String },
26    #[error(transparent)]
27    Dendrite(#[from] DendriteError),
28    #[error("Type conversion failed for neuron '{neuron_name}'")]
29    NeuronTypeConversion { neuron_name: String },
30    #[error("Type conversion failed for reactant in neuron '{neuron_name}'")]
31    ReactantTypeConversion { neuron_name: String },
32    #[error("No dendrite available for processing in neuron '{neuron_name}'")]
33    NoDendrite { neuron_name: String },
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
37pub enum BackpressureStrategy {
38    Block,
39    DropOldest,
40    DropNewest,
41    Reject,
42}
43
44#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
45pub struct BackpressureConfig {
46    pub queue_size: usize,
47    pub strategy: BackpressureStrategy,
48}
49
50impl Default for BackpressureConfig {
51    fn default() -> Self {
52        Self {
53            queue_size: 1000,
54            strategy: BackpressureStrategy::Block,
55        }
56    }
57}
58
59use tokio::sync::Notify;
60use tracing::Instrument;
61
62/// A helper to manage backpressure for a stream of items.
63pub struct BackpressureQueue<T: Send + 'static> {
64    inner: Arc<BackpressureQueueInner<T>>,
65}
66
67struct BackpressureQueueInner<T: Send + 'static> {
68    queue: StdMutex<VecDeque<T>>,
69
70    config: BackpressureConfig,
71
72    neuron_name: String,
73
74    // Notify the worker when a new item is added
75    item_added: Notify,
76
77    // Notify the producers when an item is removed (for Block strategy)
78    item_removed: Notify,
79}
80
81impl<T: Send + 'static> BackpressureQueue<T> {
82    pub fn new<F, Fut>(neuron_name: String, config: BackpressureConfig, mut processor: F) -> Self
83    where
84        F: FnMut(T) -> Fut + Send + 'static,
85        Fut: Future<Output = ()> + Send + 'static,
86    {
87        let inner = Arc::new(BackpressureQueueInner {
88            queue: StdMutex::new(VecDeque::with_capacity(config.queue_size)),
89
90            config,
91
92            neuron_name: neuron_name.clone(),
93
94            item_added: Notify::new(),
95
96            item_removed: Notify::new(),
97        });
98
99        let worker_inner = inner.clone();
100
101        tokio::spawn(async move {
102            loop {
103                let item = {
104                    let mut queue = worker_inner.queue.lock().unwrap();
105
106                    if let Some(item) = queue.pop_front() {
107                        worker_inner.item_removed.notify_waiters();
108
109                        Some(item)
110                    } else {
111                        None
112                    }
113                };
114
115                if let Some(item) = item {
116                    processor(item).await;
117                } else {
118                    // Wait for a new item
119
120                    worker_inner.item_added.notified().await;
121                }
122            }
123        });
124
125        Self { inner }
126    }
127
128    pub async fn push(&self, item: T) -> Result<(), SynapseError> {
129        loop {
130            // Use a block to ensure the lock is dropped before any await
131            let should_wait = {
132                let mut queue = self.inner.queue.lock().unwrap();
133
134                if queue.len() < self.inner.config.queue_size {
135                    queue.push_back(item);
136                    self.inner.item_added.notify_one();
137                    return Ok(());
138                }
139
140                match self.inner.config.strategy {
141                    BackpressureStrategy::Block => true,
142                    BackpressureStrategy::DropOldest => {
143                        queue.pop_front();
144                        queue.push_back(item);
145                        self.inner.item_added.notify_one();
146                        tracing::warn!(
147                            neuron = %self.inner.neuron_name,
148                            "Backpressure: Dropped oldest message (queue full)"
149                        );
150                        return Ok(());
151                    }
152                    BackpressureStrategy::DropNewest => {
153                        tracing::warn!(
154                            neuron = %self.inner.neuron_name,
155                            "Backpressure: Dropped newest message (queue full)"
156                        );
157                        return Ok(()); // Just drop it
158                    }
159                    BackpressureStrategy::Reject => {
160                        tracing::warn!(
161                            neuron = %self.inner.neuron_name,
162                            "Backpressure: Rejected message (queue full)"
163                        );
164                        return Err(SynapseError::QueueFull {
165                            neuron_name: self.inner.neuron_name.clone(),
166                        });
167                    }
168                }
169            };
170
171            if should_wait {
172                self.inner.item_removed.notified().await;
173            }
174        }
175    }
176}
177
178pub trait SynapseInternal<T, C>
179where
180    C: Codec<T> + CodecName + Send + Sync + 'static,
181    T: Send + Sync + 'static,
182{
183    fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync>;
184    fn transduce(
185        &self,
186        payload: Arc<Payload<T, C>>,
187    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
188    fn transmit(
189        &self,
190        payload: Arc<Payload<T, C>>,
191    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
192    fn react(
193        &mut self,
194        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
195        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
196    ) -> Result<(), SynapseError>;
197}
198
199pub trait SynapseExternal<T, C>
200where
201    C: Codec<T> + CodecName + Send + Sync + 'static,
202    T: Send + Sync + 'static,
203{
204    fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync>;
205    fn transduce(
206        &self,
207        payload: Arc<PayloadRaw<T, C>>,
208    ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static;
209    fn transmit(
210        &self,
211        payload: Arc<PayloadRaw<T, C>>,
212    ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static;
213    fn react(
214        &mut self,
215        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
216        raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
217        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
218    ) -> Result<(), SynapseError>;
219}
220
221/// Trait for abstracting raw data transmission.
222pub trait RawSender: Send + Sync {
223    fn send(
224        &self,
225        topic: &str,
226        data: Vec<u8>,
227    ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
228}
229
230/// A wrapper for RawSender that applies backpressure buffering.
231pub struct BackpressureSender<S: RawSender> {
232    queue: Arc<BackpressureQueue<(String, Vec<u8>)>>,
233    _marker: PhantomData<S>,
234}
235
236impl<S: RawSender + 'static> BackpressureSender<S> {
237    pub fn new(inner: S, config: BackpressureConfig, neuron_name: String) -> Self {
238        let inner_arc = Arc::new(inner);
239        let inner_clone = inner_arc.clone();
240
241        let queue = BackpressureQueue::<(String, Vec<u8>)>::new(
242            neuron_name,
243            config,
244            move |(topic, data)| {
245                let s = inner_clone.clone();
246                async move {
247                    if let Err(e) = s.send(&topic, data).await {
248                        eprintln!("BackpressureSender failed to send: {}", e);
249                    }
250                }
251            },
252        );
253
254        Self {
255            queue: Arc::new(queue),
256            _marker: PhantomData,
257        }
258    }
259}
260
261impl<S: RawSender + 'static> RawSender for BackpressureSender<S> {
262    fn send(
263        &self,
264        topic: &str,
265        data: Vec<u8>,
266    ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> {
267        let q = self.queue.clone();
268        let topic = topic.to_string();
269        Box::pin(async move {
270            q.push((topic, data)).await.map_err(|e| e.to_string())
271        })
272    }
273}
274
275/// A wrapper for external synapses that adds backpressure management.
276pub struct BackpressureExternalSynapse<T, C, S>
277where
278    C: Codec<T> + CodecName + Send + Sync + 'static,
279    T: Send + Sync + 'static,
280    S: SynapseExternal<T, C> + Send + Sync + 'static,
281{
282    inner: Arc<RwLock<S>>,
283    neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
284    queue: Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>>,
285    _phantom: PhantomData<(T, C)>,
286}
287
288impl<T, C, S> BackpressureExternalSynapse<T, C, S>
289where
290    C: Codec<T> + CodecName + Send + Sync + 'static,
291    T: Send + Sync + 'static,
292    S: SynapseExternal<T, C> + Send + Sync + 'static,
293{
294    pub fn new(synapse: S, config: BackpressureConfig) -> Self {
295        let neuron = synapse.neuron();
296        let neuron_name = neuron.name();
297        let synapse_arc = Arc::new(RwLock::new(synapse));
298        let inner_clone = synapse_arc.clone();
299
300        let queue = BackpressureQueue::new(
301            neuron_name,
302            config,
303            move |payload: Arc<PayloadRaw<T, C>>| {
304                let s = inner_clone.clone();
305                async move {
306                    let future = {
307                        let guard = s.read().unwrap();
308                        guard.transmit(payload)
309                    };
310                    let _ = future.await;
311                }
312            },
313        );
314
315        Self {
316            inner: synapse_arc,
317            neuron,
318            queue: Arc::new(queue),
319            _phantom: PhantomData,
320        }
321    }
322}
323
324impl<T, C, S> SynapseExternal<T, C> for BackpressureExternalSynapse<T, C, S>
325where
326    C: Codec<T> + CodecName + Send + Sync + 'static,
327    T: Send + Sync + 'static,
328    S: SynapseExternal<T, C> + Send + Sync + 'static,
329{
330    fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
331        self.neuron.clone()
332    }
333
334    fn transduce(
335        &self,
336        payload: Arc<PayloadRaw<T, C>>,
337    ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
338        self.transmit(payload)
339    }
340
341    fn transmit(
342        &self,
343        payload: Arc<PayloadRaw<T, C>>,
344    ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
345        let q = self.queue.clone();
346        Box::pin(async move {
347            q.push(payload).await?;
348            Ok((vec![], vec![]))
349        })
350    }
351
352    fn react(
353        &mut self,
354        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
355        raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
356        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
357    ) -> Result<(), SynapseError> {
358        let mut guard = self.inner.write().unwrap();
359        guard.react(reactants, raw_reactants, error_reactants)
360    }
361}
362
363pub struct SynapseInprocess<T, C>
364where
365    C: Codec<T> + CodecName + Send + Sync + 'static,
366    T: Sync + Send + 'static,
367{
368    neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
369    dendrite: Option<Dendrite<T, C>>,
370    _codec_marker: PhantomData<fn() -> &'static ()>,
371}
372
373impl<T, C> SynapseInprocess<T, C>
374where
375    C: Codec<T> + CodecName + Send + Sync + 'static,
376    T: Sync + Send + 'static,
377{
378    pub fn new(
379        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
380        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
381        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
382    ) -> Self {
383        let dendrite = if !reactants.is_empty() || !error_reactants.is_empty() {
384            Some(Dendrite::new(neuron.clone(), reactants, error_reactants))
385        } else {
386            None
387        };
388        Self {
389            neuron,
390            dendrite,
391            _codec_marker: PhantomData,
392        }
393    }
394
395    /// Create a new SynapseInprocess from type-erased neuron and reactants
396    /// Returns None if the types don't match
397    pub fn from_erased(
398        neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
399        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
400    ) -> Option<Self>
401    where
402        T: 'static,
403        C: 'static,
404    {
405        use std::any::TypeId;
406
407        // First, check if the neuron's type parameters match what we need
408        let neuron_type_id = neuron.payload_type_id();
409        let codec_type_id = neuron.codec_type_id();
410
411        if neuron_type_id != TypeId::of::<T>() || codec_type_id != TypeId::of::<C>() {
412            return None;
413        }
414
415        // Try to convert the neuron to the correct type safely using Any downcast
416        let typed_neuron = match neuron.as_any().downcast_ref::<NeuronErasedWrapper<T, C>>() {
417            Some(wrapper) => wrapper.get_typed_neuron(),
418            None => return None,
419        };
420
421        // Convert reactants
422        let typed_reactants: Vec<_> = reactants
423            .into_iter()
424            .filter_map(|erased_reactant| {
425                // Check if this reactant's type parameters match
426                if erased_reactant.payload_type_id() != TypeId::of::<T>()
427                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
428                {
429                    return None;
430                }
431
432                // Try to convert to the correct type using safe Any downcast
433                let any_arc = erased_reactant.clone_to_any();
434                any_arc
435                    .downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
436                    .ok()
437                    .map(|boxed_arc| (*boxed_arc).clone())
438            })
439            .collect();
440
441        // Create a new SynapseInprocess with the typed objects
442        Some(Self::new(typed_neuron.clone(), typed_reactants, vec![]))
443    }
444}
445
446impl<T, C> SynapseInternal<T, C> for SynapseInprocess<T, C>
447where
448    C: Codec<T> + CodecName + Send + Sync + 'static,
449    T: Sync + Send + 'static,
450{
451    fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
452        self.neuron.clone()
453    }
454
455    fn transduce(
456        &self,
457        payload: Arc<Payload<T, C>>,
458    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
459        let span = payload.span_debug("SynapseInprocess::transduce");
460        let neuron_name = self.neuron.name();
461        match &self.dendrite {
462            Some(dendrite) => {
463                let future = dendrite.transduce(payload);
464                Box::pin(
465                    async move {
466                        tracing::debug!(neuron = %neuron_name, "SynapseInprocess::transduce calling dendrite.transduce");
467                        future.await.map_err(SynapseError::from)
468                    }
469                    .instrument(span),
470                )
471            }
472            None => Box::pin(
473                async move {
474                    tracing::debug!(
475                        neuron = %neuron_name,
476                        "SynapseInprocess::transduce no dendrite, returning empty vec"
477                    );
478                    Ok(vec![])
479                }
480                .instrument(span),
481            ),
482        }
483    }
484
485    fn transmit(
486        &self,
487        payload: Arc<Payload<T, C>>,
488    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
489        let span = payload.span_debug("SynapseInprocess::transmit");
490        // This is an internal synapse so transmit directly to any reactants we have
491        let future = self.transduce(payload);
492        Box::pin(
493            async move {
494                tracing::debug!("SynapseInprocess::transmit called");
495                future.await
496            }
497            .instrument(span),
498        )
499    }
500
501    fn react(
502        &mut self,
503        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
504        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
505    ) -> Result<(), SynapseError> {
506        if reactants.is_empty() && error_reactants.is_empty() {
507            return Ok(());
508        }
509
510        match &self.dendrite {
511            Some(dendrite) => {
512                // Add reactants to existing dendrite
513                dendrite.add_reactants(reactants)?;
514                dendrite.add_error_reactants(error_reactants)?;
515            }
516            None => {
517                // Create a new dendrite with the stored neuron and reactants
518                self.dendrite = Some(Dendrite::new(
519                    self.neuron.clone(),
520                    reactants,
521                    error_reactants,
522                ));
523            }
524        }
525        Ok(())
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use crate::logging::TraceContext;
533    use crate::neuron::NeuronImpl;
534    use crate::payload::PayloadRaw;
535    use crate::test_utils::{
536        DebugCodec, DebugStruct, SynapseExternalInprocess, TokioMpscReactant, TokioMpscReactantRaw,
537        test_namespace,
538    };
539    use tokio::sync::mpsc::channel;
540    use uuid::Uuid;
541
542    #[tokio::test]
543    async fn test_synapse_inprocess_transmit() {
544        let ns = test_namespace();
545
546        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
547
548        let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> = vec![
549            Arc::new(TokioMpscReactant { sender: tx.clone() }),
550            Arc::new(TokioMpscReactant { sender: tx.clone() }),
551        ];
552        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
553        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
554        let synapse = SynapseInprocess::new(neuron.clone(), reactants, vec![]);
555
556        let debug_struct = Arc::new(DebugStruct {
557            foo: 42,
558            bar: "test_value".to_owned(),
559        });
560        let correlation_id = Uuid::now_v7();
561        let span_id = Uuid::now_v7().as_u128() as u64;
562        let _ = synapse
563            .transmit(
564                Payload::builder()
565                    .value((*debug_struct).clone())
566                    .correlation_id(correlation_id)
567                    .neuron(neuron.clone())
568                    .span_id(span_id)
569                    .build()
570                    .unwrap(),
571            )
572            .await;
573
574        assert_eq!(rx.len(), 2);
575
576        let p = rx.recv().await.unwrap();
577        assert_eq!(p.value, debug_struct);
578        assert_eq!(p.correlation_id(), correlation_id);
579        assert_eq!(p.span_id(), span_id);
580        assert_eq!(rx.len(), 1);
581        let p2 = rx.recv().await.unwrap();
582        assert_eq!(p2.value, debug_struct);
583        assert_eq!(p2.correlation_id(), correlation_id);
584        assert_eq!(p2.span_id(), span_id);
585        assert_eq!(rx.len(), 0);
586
587        let debug_struct_2 = Arc::new(DebugStruct {
588            foo: 49,
589            bar: "foo_bar".to_owned(),
590        });
591        let correlation_id_2 = Uuid::now_v7();
592        let span_id_2 = Uuid::now_v7().as_u128() as u64;
593        let _ = synapse
594            .transmit(
595                Payload::builder()
596                    .value((*debug_struct_2).clone())
597                    .correlation_id(correlation_id_2)
598                    .neuron(neuron.clone())
599                    .span_id(span_id_2)
600                    .build()
601                    .unwrap(),
602            )
603            .await;
604
605        let p3 = rx.recv().await.unwrap();
606        assert_eq!(p3.value, debug_struct_2);
607        assert_eq!(p3.correlation_id(), correlation_id_2);
608        assert_eq!(p3.span_id(), span_id_2);
609        assert_eq!(rx.len(), 1);
610        let p4 = rx.recv().await.unwrap();
611        assert_eq!(p4.value, debug_struct_2);
612        assert_eq!(p4.correlation_id(), correlation_id_2);
613        assert_eq!(p4.span_id(), span_id_2);
614        assert_eq!(rx.len(), 0);
615    }
616
617    #[tokio::test]
618    async fn test_synapse_inprocess_with_none_reactants() {
619        let ns = test_namespace();
620        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
621        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
622
623        // Create synapse with None reactants
624        let synapse = SynapseInprocess::new(neuron.clone(), vec![], vec![]);
625
626        let debug_struct = Arc::new(DebugStruct {
627            foo: 42,
628            bar: "test_value".to_owned(),
629        });
630        let correlation_id = Uuid::now_v7();
631        let span_id = Uuid::now_v7().as_u128() as u64;
632
633        // This should return empty vector since dendrite is None
634        let result = synapse
635            .transmit(
636                Payload::builder()
637                    .value((*debug_struct).clone())
638                    .correlation_id(correlation_id)
639                    .neuron(neuron.clone())
640                    .span_id(span_id)
641                    .build()
642                    .unwrap(),
643            )
644            .await;
645
646        assert_eq!(
647            result.expect("Should succeed").len(),
648            0,
649            "Should return empty vector when dendrite is None"
650        );
651    }
652
653    #[tokio::test]
654    async fn test_synapse_external_with_none_reactants() {
655        let ns = test_namespace();
656        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
657        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
658            Arc::new(neuron_impl.clone());
659
660        // Create synapse with None reactants and raw_reactants
661        let synapse = SynapseExternalInprocess::new(neuron.clone(), vec![], vec![], vec![]);
662
663        let debug_struct_value = DebugStruct {
664            foo: 42,
665            bar: "test_value".to_owned(),
666        };
667        let debug_struct_arc = Arc::new(debug_struct_value);
668        let correlation_id = Uuid::now_v7();
669        let direct_neuron_encoder: NeuronImpl<DebugStruct, DebugCodec> =
670            NeuronImpl::new(ns.clone());
671        let encoded = direct_neuron_encoder
672            .encode(debug_struct_arc.as_ref())
673            .expect("Failed to encode");
674        let span_id = Uuid::now_v7().as_u128() as u64;
675
676        // This should return empty vectors since dendrite_decoder is None
677        let result = synapse
678            .transmit(Arc::new(PayloadRaw {
679                value: Arc::new(encoded.clone()),
680                neuron: Some(neuron.clone()),
681                trace: TraceContext::from_parts(correlation_id, span_id, None),
682            }))
683            .await
684            .expect("Should succeed");
685
686        assert_eq!(
687            result.0.len(),
688            0,
689            "Should return empty vector for reactants when dendrite_decoder is None"
690        );
691        assert_eq!(
692            result.1.len(),
693            0,
694            "Should return empty vector for raw_reactants when dendrite_decoder is None"
695        );
696    }
697
698    #[tokio::test]
699    async fn test_synapse_external_inprocess_transmit() {
700        let ns = test_namespace();
701
702        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
703        let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
704
705        let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
706            vec![Arc::new(TokioMpscReactant { sender: tx.clone() })];
707        let raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
708            vec![Arc::new(TokioMpscReactantRaw {
709                sender: tx_raw.clone(),
710            })];
711        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
712        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
713            Arc::new(neuron_impl.clone());
714        let synapse =
715            SynapseExternalInprocess::new(neuron.clone(), reactants, raw_reactants, vec![]);
716
717        let debug_struct_value = DebugStruct {
718            foo: 42,
719            bar: "test_value".to_owned(),
720        };
721        let debug_struct_arc = Arc::new(debug_struct_value);
722
723        let correlation_id = Uuid::now_v7();
724        let direct_neuron_encoder: NeuronImpl<DebugStruct, DebugCodec> =
725            NeuronImpl::new(ns.clone());
726        let encoded = direct_neuron_encoder
727            .encode(debug_struct_arc.as_ref())
728            .expect("Failed to encode");
729
730        let span_id = Uuid::now_v7().as_u128() as u64;
731
732        let _ = synapse
733            .transmit(Arc::new(PayloadRaw {
734                value: Arc::new(encoded.clone()),
735                neuron: Some(neuron.clone()),
736                trace: TraceContext::from_parts(correlation_id, span_id, None),
737            }))
738            .await;
739
740        let p = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
741            .await
742            .expect("Timeout rx1")
743            .expect("Closed rx1");
744        assert_eq!(p.value, debug_struct_arc);
745        assert_eq!(p.correlation_id(), correlation_id);
746        assert_eq!(p.span_id(), span_id);
747
748        let p_raw = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
749            .await
750            .expect("Timeout raw_rx1")
751            .expect("Closed raw_rx1");
752        assert_eq!(p_raw.value.as_slice(), encoded.as_slice());
753        assert_eq!(p_raw.correlation_id(), correlation_id);
754        assert_eq!(p_raw.span_id(), span_id);
755
756        let debug_struct_2_value = DebugStruct {
757            foo: 49,
758            bar: "foo_bar".to_owned(),
759        };
760        let debug_struct_2_arc = Arc::new(debug_struct_2_value);
761        let correlation_id_2 = Uuid::now_v7();
762        let encoded_2 = direct_neuron_encoder
763            .encode(debug_struct_2_arc.as_ref())
764            .expect("Failed to encode");
765        let span_id_2 = Uuid::now_v7().as_u128() as u64;
766        let _ = synapse
767            .transmit(Arc::new(PayloadRaw {
768                value: Arc::new(encoded_2.clone()),
769                neuron: Some(neuron.clone()),
770                trace: TraceContext::from_parts(correlation_id_2, span_id_2, None),
771            }))
772            .await;
773
774        let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
775            .await
776            .expect("Timeout rx2")
777            .expect("Closed rx2");
778        assert_eq!(p2.value, debug_struct_2_arc);
779        assert_eq!(p2.correlation_id(), correlation_id_2);
780        assert_eq!(p2.span_id(), span_id_2);
781
782        let p_raw2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
783            .await
784            .expect("Timeout raw_rx2")
785            .expect("Closed raw_rx2");
786        assert_eq!(p_raw2.value.as_slice(), encoded_2.as_slice());
787        assert_eq!(p_raw2.correlation_id(), correlation_id_2);
788        assert_eq!(p_raw2.span_id(), span_id_2);
789    }
790}