Skip to main content

plexor_core/
dendrite.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::backpressure::{BackpressureConfig, BackpressureQueue};
8use crate::codec::{Codec, CodecName};
9use crate::logging::LogTrace;
10use crate::neuron::Neuron;
11use crate::payload::{Payload, PayloadRaw};
12use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
13use crate::synapse::SynapseError;
14use futures_util::future::join_all;
15use std::future::Future;
16use std::marker::PhantomData;
17use std::pin::Pin;
18use std::sync::Arc;
19use parking_lot::RwLock;
20use thiserror::Error;
21use tracing::Instrument;
22
23#[derive(Error, Debug)]
24pub enum DendriteError {
25    #[error("Failed to acquire read lock on reactants for neuron '{neuron_name}'")]
26    ReactantsReadLock { neuron_name: String },
27    #[error("Failed to acquire write lock on reactants for neuron '{neuron_name}'")]
28    ReactantsWriteLock { neuron_name: String },
29    #[error("Failed to acquire read lock on raw reactants for neuron '{neuron_name}'")]
30    RawReactantsReadLock { neuron_name: String },
31    #[error("Failed to acquire write lock on raw reactants for neuron '{neuron_name}'")]
32    RawReactantsWriteLock { neuron_name: String },
33    #[error("Failed to acquire read lock on error reactants for neuron '{neuron_name}'")]
34    ErrorReactantsReadLock { neuron_name: String },
35    #[error("Failed to acquire write lock on error reactants for neuron '{neuron_name}'")]
36    ErrorReactantsWriteLock { neuron_name: String },
37    #[error("Internal error: {0}")]
38    Other(String),
39}
40
41pub struct Dendrite<T, C>
42where
43    C: Codec<T> + CodecName + Send + Sync + 'static,
44    T: Sync + Send + 'static,
45{
46    _neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
47    reactants: RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>,
48    error_reactants: RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>,
49    _codec_marker: PhantomData<fn() -> &'static ()>,
50    _phantom_t: PhantomData<T>,
51}
52
53impl<T, C> Dendrite<T, C>
54where
55    C: Codec<T> + CodecName + Send + Sync + 'static,
56    T: Sync + Send + 'static,
57{
58    #[must_use]
59    pub fn new(
60        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
61        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
62        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
63    ) -> Self {
64        Self {
65            _neuron: neuron,
66            reactants: RwLock::new(reactants),
67            error_reactants: RwLock::new(error_reactants),
68            _codec_marker: PhantomData,
69            _phantom_t: PhantomData,
70        }
71    }
72
73    /// Add reactants to the dendrite after creation
74    pub fn add_reactants(
75        &self,
76        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
77    ) -> Result<(), DendriteError> {
78        if !reactants.is_empty() {
79            let mut write_guard = self.reactants.write();
80            write_guard.extend(reactants);
81        }
82        Ok(())
83    }
84
85    /// Add error reactants to the dendrite after creation
86    pub fn add_error_reactants(
87        &self,
88        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
89    ) -> Result<(), DendriteError> {
90        if !error_reactants.is_empty() {
91            let mut write_guard = self.error_reactants.write();
92            write_guard.extend(error_reactants);
93        }
94        Ok(())
95    }
96
97    pub fn transduce(
98        &self,
99        payload: Arc<Payload<T, C>>,
100    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, DendriteError>> + Send + 'static>> {
101        tracing::debug!("Dendrite::transduce called");
102
103        let reactants_clone = {
104            let guard = self.reactants.read();
105            if guard.is_empty() {
106                tracing::debug!("Dendrite::transduce no reactants, returning empty vec");
107                return Box::pin(async move { Ok(vec![]) });
108            }
109            guard.clone()
110        };
111
112        let error_reactants_clone = {
113            let guard = self.error_reactants.read();
114            guard.clone()
115        };
116
117        let payload_clone = payload.clone();
118        tracing::debug!(
119            "Dendrite::transduce - Cloned {} reactants",
120            reactants_clone.len()
121        );
122
123        Box::pin(
124            async move {
125                tracing::debug!(
126                    "Dendrite::transduce creating futures for {} reactants",
127                    reactants_clone.len()
128                );
129                let futures = reactants_clone
130                    .iter()
131                    .map(|reactant| reactant.react(payload_clone.clone()))
132                    .collect::<Vec<_>>();
133
134                tracing::debug!(
135                    "Dendrite::transduce awaiting join_all of {} futures",
136                    futures.len()
137                );
138                let results = join_all(futures).await;
139                tracing::debug!("Dendrite::transduce join_all completed");
140
141                let mut errors = Vec::new();
142                let successes: Vec<()> = results
143                    .into_iter()
144                    .filter_map(|r| match r {
145                        Ok(_) => Some(()),
146                        Err(e) => {
147                            errors.push(e);
148                            None
149                        }
150                    })
151                    .collect();
152
153                if !errors.is_empty() && !error_reactants_clone.is_empty() {
154                    // Dispatch errors to error reactants
155                    let error_futures = errors.into_iter().flat_map(|err| {
156                        let err_arc = Arc::new(err);
157                        let p = payload_clone.clone();
158                        error_reactants_clone
159                            .iter()
160                            .map(move |er| er.react_error(err_arc.clone(), p.clone()))
161                    });
162                    join_all(error_futures).await;
163                } else if !errors.is_empty() {
164                    for e in errors {
165                        tracing::error!("Reactant error: {e}");
166                    }
167                }
168
169                Ok(successes)
170            }
171            .instrument(payload.span_debug("Dendrite::transduce")),
172        )
173    }
174}
175
176#[allow(clippy::type_complexity)]
177pub struct DendriteDecoder<T, C>
178where
179    C: Codec<T> + CodecName + Send + Sync + 'static,
180    T: Send + Sync + 'static,
181{
182    reactants: Arc<RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>>,
183    raw_reactants: Arc<RwLock<Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>>>,
184    error_reactants: Arc<RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>>,
185    ingress_queue: RwLock<Option<Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>>>>,
186    _codec_marker: PhantomData<fn() -> &'static ()>,
187    _phantom_t: PhantomData<T>,
188}
189
190impl<T, C> DendriteDecoder<T, C>
191where
192    C: Codec<T> + CodecName + Send + Sync + 'static,
193    T: Send + Sync + 'static,
194{
195    #[must_use]
196    pub fn new(
197        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
198        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
199        raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
200        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
201        backpressure: Option<BackpressureConfig>,
202    ) -> Self {
203        let reactants = Arc::new(RwLock::new(reactants));
204        let raw_reactants = Arc::new(RwLock::new(raw_reactants));
205        let error_reactants = Arc::new(RwLock::new(error_reactants));
206        let reactants_clone = reactants.clone();
207        let raw_reactants_clone = raw_reactants.clone();
208        let error_reactants_clone = error_reactants.clone();
209
210        let ingress_queue = BackpressureQueue::new(
211            neuron.name(),
212            backpressure.unwrap_or_default(),
213            move |payload: Arc<PayloadRaw<T, C>>| {
214                Self::process_ingress(
215                    reactants_clone.clone(),
216                    raw_reactants_clone.clone(),
217                    error_reactants_clone.clone(),
218                    payload,
219                )
220            },
221        );
222
223        Self {
224            reactants,
225            raw_reactants,
226            error_reactants,
227            ingress_queue: RwLock::new(Some(Arc::new(ingress_queue))),
228            _codec_marker: PhantomData,
229            _phantom_t: PhantomData,
230        }
231    }
232
233    #[allow(clippy::type_complexity)]
234    async fn process_ingress(
235        rs: Arc<RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>>,
236        rrs: Arc<RwLock<Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>>>,
237        ers: Arc<RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>>,
238        payload: Arc<PayloadRaw<T, C>>,
239    ) {
240        let span = payload.span_debug("DendriteDecoder::process_ingress");
241        async move {
242            let neuron = payload.neuron.clone();
243            let decoded_value = match neuron.decode(&payload.value) {
244                Ok(value) => value,
245                Err(_) => return,
246            };
247
248            let decoded_payload = Arc::new(Payload::from_parts(
249                Arc::new(decoded_value),
250                neuron.clone(),
251                payload.trace,
252            ));
253
254            let reactants_vec: Vec<_> = {
255                let guard = rs.read();
256                guard.iter().cloned().collect()
257            };
258
259            let raw_reactants_vec: Vec<_> = {
260                let guard = rrs.read();
261                guard.iter().cloned().collect()
262            };
263
264            let error_reactants_vec: Vec<_> = {
265                let guard = ers.read();
266                guard.iter().cloned().collect()
267            };
268
269            if reactants_vec.is_empty() && raw_reactants_vec.is_empty() {
270                return;
271            }
272
273            let decoded_futures = reactants_vec
274                .iter()
275                .map(|reactant| reactant.react(decoded_payload.clone()));
276
277            let raw_futures = raw_reactants_vec
278                .iter()
279                .map(|raw_reactant| raw_reactant.react(payload.clone()));
280
281            let (decoded_results, raw_results) =
282                futures_util::future::join(join_all(decoded_futures), join_all(raw_futures)).await;
283
284            let mut errors = Vec::new();
285            for res in decoded_results {
286                if let Err(e) = res {
287                    errors.push(e);
288                }
289            }
290            for res in raw_results {
291                if let Err(e) = res {
292                    errors.push(e);
293                }
294            }
295
296            if !errors.is_empty() && !error_reactants_vec.is_empty() {
297                let error_futures = errors.into_iter().flat_map(|err| {
298                    let err_arc = Arc::new(err);
299                    let p = decoded_payload.clone();
300                    error_reactants_vec
301                        .iter()
302                        .map(move |er| er.react_error(err_arc.clone(), p.clone()))
303                });
304                join_all(error_futures).await;
305            } else if !errors.is_empty() {
306                for e in errors {
307                    tracing::error!("Reactant error: {e}");
308                }
309            }
310        }
311        .instrument(span)
312        .await
313    }
314
315    /// Add reactants to the dendrite decoder after creation
316    pub fn add_reactants(
317        &self,
318        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
319    ) -> Result<(), DendriteError> {
320        if !reactants.is_empty() {
321            let mut write_guard = self.reactants.write();
322            write_guard.extend(reactants);
323        }
324        Ok(())
325    }
326
327    /// Add raw reactants to the dendrite decoder after creation
328    pub fn add_raw_reactants(
329        &self,
330        raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
331    ) -> Result<(), DendriteError> {
332        if !raw_reactants.is_empty() {
333            let mut write_guard = self.raw_reactants.write();
334            write_guard.extend(raw_reactants);
335        }
336        Ok(())
337    }
338
339    /// Add error reactants to the dendrite decoder after creation
340    pub fn add_error_reactants(
341        &self,
342        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
343    ) -> Result<(), DendriteError> {
344        if !error_reactants.is_empty() {
345            let mut write_guard = self.error_reactants.write();
346            write_guard.extend(error_reactants);
347        }
348        Ok(())
349    }
350
351    #[allow(clippy::type_complexity)]
352    pub fn transduce(
353        &self,
354        payload: Arc<PayloadRaw<T, C>>,
355    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), DendriteError>> + Send + 'static>>
356    {
357        let queue_lock = self.ingress_queue.read();
358        let queue: Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>> = match &*queue_lock {
359            Some(q) => q.clone(),
360            None => {
361                return Box::pin(async move { Ok((vec![], vec![])) });
362            }
363        };
364        drop(queue_lock);
365
366        Box::pin(async move {
367            queue.push(payload).await.map_err(|e| match e {
368                SynapseError::QueueFull { neuron_name } => {
369                    DendriteError::ReactantsWriteLock { neuron_name }
370                }
371                _ => DendriteError::ReactantsReadLock {
372                    neuron_name: "unknown".to_string(),
373                },
374            })?;
375            // BackpressureQueue handles the actual processing in background.
376            // For now, return empty vecs as the "acks" because processing is async.
377            Ok((vec![], vec![]))
378        })
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use crate::neuron::NeuronImpl;
386    use crate::test_utils::{
387        DebugCodec, DebugStruct, TokioMpscReactant, TokioMpscReactantRaw, test_namespace,
388    };
389    use std::thread;
390    use tokio::sync::mpsc::channel;
391    use uuid::Uuid;
392
393    #[tokio::test]
394    async fn test_dendrite_transduce() {
395        let ns = test_namespace();
396        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
397        let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
398            Arc::new(neuron_impl);
399
400        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
401
402        let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
403            vec![Arc::new(TokioMpscReactant { sender: tx })];
404        let dendrite = Dendrite::new(neuron_arc.clone(), reactants, vec![]);
405
406        let debug_struct_val = DebugStruct {
407            foo: 42,
408            bar: "test_value".to_owned(),
409        };
410        let uuid = Uuid::now_v7();
411        let _ = dendrite
412            .transduce(Payload::with_correlation(
413                debug_struct_val.clone(),
414                neuron_arc.clone(),
415                Some(uuid),
416            ))
417            .await;
418
419        assert_eq!(rx.len(), 1);
420        let p = rx.recv().await.unwrap();
421        assert_eq!(*p.value, debug_struct_val);
422        assert_eq!(p.correlation_id(), uuid);
423    }
424
425    #[tokio::test]
426    async fn test_dendrite_multiple_reactants() {
427        let ns = test_namespace();
428        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
429        let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
430            Arc::new(neuron_impl);
431
432        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
433        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
434
435        let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> = vec![
436            Arc::new(TokioMpscReactant { sender: tx1 }),
437            Arc::new(TokioMpscReactant { sender: tx2 }),
438        ];
439        let dendrite = Dendrite::new(neuron_arc.clone(), reactants, vec![]);
440
441        let debug_struct_val = DebugStruct {
442            foo: 100,
443            bar: "multi_test".to_owned(),
444        };
445        let uuid = Uuid::now_v7();
446        let payload_value = Arc::new(debug_struct_val.clone());
447
448        let _ = dendrite
449            .transduce(Payload::with_correlation(
450                debug_struct_val.clone(),
451                neuron_arc.clone(),
452                Some(uuid),
453            ))
454            .await;
455
456        assert_eq!(rx1.len(), 1);
457
458        let p1 = rx1.recv().await.unwrap();
459        assert_eq!(p1.value, payload_value);
460        assert_eq!(p1.correlation_id(), uuid);
461
462        assert_eq!(rx2.len(), 1);
463        let p2 = rx2.recv().await.unwrap();
464        assert_eq!(p2.value, payload_value);
465        assert_eq!(p2.correlation_id(), uuid);
466    }
467
468    #[tokio::test]
469    async fn test_decoder_dendrite_transduce() {
470        let ns = test_namespace();
471        let neuron_impl_for_encoding: NeuronImpl<DebugStruct, DebugCodec> =
472            NeuronImpl::new(ns.clone());
473        let neuron_arc_for_dendrite: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
474            Arc::new(NeuronImpl::new(ns.clone())); // Can be a different instance, or the same
475
476        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
477        let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
478
479        let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
480            vec![Arc::new(TokioMpscReactant { sender: tx })];
481        let raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
482            vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw })];
483
484        let dendrite_decoder = DendriteDecoder::new(
485            neuron_arc_for_dendrite.clone(),
486            reactants,
487            raw_reactants,
488            vec![],
489            None,
490        );
491
492        let uuid = Uuid::now_v7();
493        let debug_struct_val = DebugStruct {
494            foo: 49,
495            bar: "foo_bar".to_owned(),
496        };
497        let encoded = neuron_impl_for_encoding
498            .encode(&debug_struct_val)
499            .expect("Encoding should succeed in test");
500
501        let _ = dendrite_decoder
502            .transduce(PayloadRaw::with_correlation(
503                encoded.clone(),
504                neuron_arc_for_dendrite.clone(),
505                Some(uuid),
506            ))
507            .await;
508
509        let p = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
510            .await
511            .expect("Timeout waiting for decoded message")
512            .expect("Channel closed");
513        assert_eq!(*p.value, debug_struct_val);
514        assert_eq!(p.correlation_id(), uuid);
515
516        let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
517            .await
518            .expect("Timeout waiting for raw message")
519            .expect("Channel closed");
520        assert_eq!(p2.value.as_slice(), encoded.as_slice());
521        assert_eq!(p2.correlation_id(), uuid);
522    }
523
524    #[tokio::test]
525    async fn test_dendrite_add_reactants() {
526        let ns = test_namespace();
527        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
528        let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
529            Arc::new(neuron_impl);
530
531        // Create initial reactant
532        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
533        let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
534            vec![Arc::new(TokioMpscReactant { sender: tx1 })];
535
536        // Create dendrite with initial reactant
537        let dendrite = Dendrite::new(neuron_arc.clone(), initial_reactants, vec![]);
538
539        // Create additional reactant to add later
540        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
541        let additional_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
542            vec![Arc::new(TokioMpscReactant { sender: tx2 })];
543
544        // Add the additional reactant
545        let _ = dendrite.add_reactants(additional_reactants);
546
547        // Create a payload to send
548        let debug_struct_val = DebugStruct {
549            foo: 42,
550            bar: "test_add_reactants".to_owned(),
551        };
552        let uuid = Uuid::now_v7();
553        let payload =
554            Payload::with_correlation(debug_struct_val.clone(), neuron_arc.clone(), Some(uuid));
555
556        // Transduce the payload
557        let _ = dendrite.transduce(payload.clone()).await;
558
559        // Both reactants should receive the payload
560        assert_eq!(rx1.len(), 1);
561        let p1 = rx1.recv().await.unwrap();
562        assert_eq!(*p1.value, debug_struct_val);
563        assert_eq!(p1.correlation_id(), uuid);
564
565        assert_eq!(rx2.len(), 1);
566        let p2 = rx2.recv().await.unwrap();
567        assert_eq!(*p2.value, debug_struct_val);
568        assert_eq!(p2.correlation_id(), uuid);
569    }
570
571    #[tokio::test]
572    async fn test_dendrite_decoder_add_reactants() {
573        let ns = test_namespace();
574        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
575        let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
576            Arc::new(neuron_impl);
577
578        // Create initial reactants
579        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
580        let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
581            vec![Arc::new(TokioMpscReactant { sender: tx1 })];
582
583        let (tx_raw1, mut rx_raw1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
584        let initial_raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
585            vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw1 })];
586
587        // Create dendrite decoder with initial reactants
588        let dendrite_decoder = DendriteDecoder::new(
589            neuron_arc.clone(),
590            initial_reactants,
591            initial_raw_reactants,
592            vec![],
593            None,
594        );
595
596        // Create additional reactants to add later
597        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
598        let additional_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
599            vec![Arc::new(TokioMpscReactant { sender: tx2 })];
600
601        let (tx_raw2, mut rx_raw2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
602        let additional_raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
603            vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw2 })];
604
605        // Add the additional reactants
606        let _ = dendrite_decoder.add_reactants(additional_reactants);
607        let _ = dendrite_decoder.add_raw_reactants(additional_raw_reactants);
608
609        // Create a payload to send
610        let debug_struct_val = DebugStruct {
611            foo: 42,
612            bar: "test_add_reactants_decoder".to_owned(),
613        };
614        let uuid = Uuid::now_v7();
615        let encoded = neuron_arc
616            .encode(&debug_struct_val)
617            .expect("Encoding should succeed in test");
618        let payload_raw =
619            PayloadRaw::with_correlation(encoded.clone(), neuron_arc.clone(), Some(uuid));
620
621        // Transduce the payload
622        let _ = dendrite_decoder.transduce(payload_raw.clone()).await;
623
624        // All reactants should receive the payload
625        let p1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
626            .await
627            .expect("Timeout rx1")
628            .expect("Closed rx1");
629        assert_eq!(*p1.value, debug_struct_val);
630        assert_eq!(p1.correlation_id(), uuid);
631
632        let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
633            .await
634            .expect("Timeout rx2")
635            .expect("Closed rx2");
636        assert_eq!(*p2.value, debug_struct_val);
637        assert_eq!(p2.correlation_id(), uuid);
638
639        let p_raw1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw1.recv())
640            .await
641            .expect("Timeout raw_rx1")
642            .expect("Closed raw_rx1");
643        assert_eq!(p_raw1.value.as_slice(), encoded.as_slice());
644        assert_eq!(p_raw1.correlation_id(), uuid);
645
646        let p_raw2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw2.recv())
647            .await
648            .expect("Timeout raw_rx2")
649            .expect("Closed raw_rx2");
650        assert_eq!(p_raw2.value.as_slice(), encoded.as_slice());
651        assert_eq!(p_raw2.correlation_id(), uuid);
652    }
653
654    #[tokio::test]
655    async fn test_dendrite_concurrent_readers() {
656        let ns = test_namespace();
657        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
658        let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
659            Arc::new(neuron_impl);
660
661        // Create initial reactants
662        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
663        let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
664            vec![Arc::new(TokioMpscReactant { sender: tx1 })];
665
666        // Create dendrite with initial reactant
667        let dendrite = Arc::new(Dendrite::new(neuron_arc.clone(), initial_reactants, vec![]));
668
669        // Create a payload to send
670        let debug_struct_val = DebugStruct {
671            foo: 42,
672            bar: "test_concurrent_readers".to_owned(),
673        };
674        let uuid = Uuid::now_v7();
675        let payload =
676            Payload::with_correlation(debug_struct_val.clone(), neuron_arc.clone(), Some(uuid));
677
678        // Create multiple threads that read from the dendrite concurrently
679        let num_threads = 5;
680        let mut handles = vec![];
681
682        for _ in 0..num_threads {
683            let dendrite_clone = dendrite.clone();
684            let payload_clone = payload.clone();
685
686            let handle = thread::spawn(move || {
687                let rt = tokio::runtime::Runtime::new().unwrap();
688                rt.block_on(async {
689                    let _ = dendrite_clone.transduce(payload_clone).await;
690                });
691            });
692
693            handles.push(handle);
694        }
695
696        // Wait for all threads to complete
697        for handle in handles {
698            handle.join().unwrap();
699        }
700
701        // The reactant should have received the payload num_threads times
702        for _ in 0..num_threads {
703            let p = rx1.recv().await.unwrap();
704            assert_eq!(*p.value, debug_struct_val);
705            assert_eq!(p.correlation_id(), uuid);
706        }
707    }
708}