Skip to main content

plexor_core/
dendrite.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::codec::{Codec, CodecName};
8use crate::logging::LogTrace;
9use crate::neuron::Neuron;
10use crate::payload::{Payload, PayloadRaw};
11use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
12use crate::synapse::{BackpressureConfig, BackpressureQueue, SynapseError};
13use futures_util::future::join_all;
14use std::future::Future;
15use std::marker::PhantomData;
16use std::pin::Pin;
17use std::sync::{Arc, RwLock};
18use thiserror::Error;
19use tracing::Instrument;
20
21#[derive(Error, Debug)]
22pub enum DendriteError {
23    #[error("Failed to acquire read lock on reactants for neuron '{neuron_name}'")]
24    ReactantsReadLock { neuron_name: String },
25    #[error("Failed to acquire write lock on reactants for neuron '{neuron_name}'")]
26    ReactantsWriteLock { neuron_name: String },
27    #[error("Failed to acquire read lock on raw reactants for neuron '{neuron_name}'")]
28    RawReactantsReadLock { neuron_name: String },
29    #[error("Failed to acquire write lock on raw reactants for neuron '{neuron_name}'")]
30    RawReactantsWriteLock { neuron_name: String },
31    #[error("Failed to acquire read lock on error reactants for neuron '{neuron_name}'")]
32    ErrorReactantsReadLock { neuron_name: String },
33    #[error("Failed to acquire write lock on error reactants for neuron '{neuron_name}'")]
34    ErrorReactantsWriteLock { neuron_name: String },
35    #[error("Internal error: {0}")]
36    Other(String),
37}
38
39pub struct Dendrite<T, C>
40where
41    C: Codec<T> + CodecName + Send + Sync + 'static,
42    T: Sync + Send + 'static,
43{
44    _neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
45    reactants: RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>,
46    error_reactants: RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>,
47    _codec_marker: PhantomData<fn() -> &'static ()>,
48    _phantom_t: PhantomData<T>,
49}
50
51impl<T, C> Dendrite<T, C>
52where
53    C: Codec<T> + CodecName + Send + Sync + 'static,
54    T: Sync + Send + 'static,
55{
56    #[must_use]
57    pub fn new(
58        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
59        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
60        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
61    ) -> Self {
62        Self {
63            _neuron: neuron,
64            reactants: RwLock::new(reactants),
65            error_reactants: RwLock::new(error_reactants),
66            _codec_marker: PhantomData,
67            _phantom_t: PhantomData,
68        }
69    }
70
71    /// Add reactants to the dendrite after creation
72    pub fn add_reactants(
73        &self,
74        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
75    ) -> Result<(), DendriteError> {
76        if !reactants.is_empty() {
77            let mut write_guard =
78                self.reactants
79                    .write()
80                    .map_err(|_| DendriteError::ReactantsWriteLock {
81                        neuron_name: self._neuron.name(),
82                    })?;
83            write_guard.extend(reactants);
84        }
85        Ok(())
86    }
87
88    /// Add error reactants to the dendrite after creation
89    pub fn add_error_reactants(
90        &self,
91        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
92    ) -> Result<(), DendriteError> {
93        if !error_reactants.is_empty() {
94            let mut write_guard = self.error_reactants.write().map_err(|_| {
95                DendriteError::ErrorReactantsWriteLock {
96                    neuron_name: self._neuron.name(),
97                }
98            })?;
99            write_guard.extend(error_reactants);
100        }
101        Ok(())
102    }
103
104    pub fn transduce(
105        &self,
106        payload: Arc<Payload<T, C>>,
107    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, DendriteError>> + Send + 'static>> {
108        tracing::debug!("Dendrite::transduce called");
109
110        let neuron_name = self._neuron.name();
111        let reactants_result =
112            self.reactants
113                .read()
114                .map_err(|_| DendriteError::ReactantsReadLock {
115                    neuron_name: neuron_name.clone(),
116                });
117        let reactants_clone = match reactants_result {
118            Ok(guard) => {
119                if guard.is_empty() {
120                    tracing::debug!("Dendrite::transduce no reactants, returning empty vec");
121                    return Box::pin(async move { Ok(vec![]) });
122                }
123                guard.clone()
124            }
125            Err(e) => return Box::pin(async move { Err(e) }),
126        };
127
128        // Clone error reactants
129        let error_reactants_result =
130            self.error_reactants
131                .read()
132                .map_err(|_| DendriteError::ReactantsReadLock {
133                    // Reusing error for now or add new one
134                    neuron_name: neuron_name.clone(),
135                });
136        let error_reactants_clone = match error_reactants_result {
137            Ok(guard) => guard.clone(),
138            Err(_) => Vec::new(), // If lock fails, just use empty list to avoid crashing transduce
139        };
140
141        let payload_clone = payload.clone();
142        tracing::debug!(
143            "Dendrite::transduce - Cloned {} reactants",
144            reactants_clone.len()
145        );
146
147        Box::pin(
148            async move {
149                tracing::debug!(
150                    "Dendrite::transduce creating futures for {} reactants",
151                    reactants_clone.len()
152                );
153                let futures = reactants_clone
154                    .iter()
155                    .map(|reactant| reactant.react(payload_clone.clone()))
156                    .collect::<Vec<_>>();
157
158                tracing::debug!(
159                    "Dendrite::transduce awaiting join_all of {} futures",
160                    futures.len()
161                );
162                let results = join_all(futures).await;
163                tracing::debug!("Dendrite::transduce join_all completed");
164
165                let mut errors = Vec::new();
166                let successes: Vec<()> = results
167                    .into_iter()
168                    .filter_map(|r| match r {
169                        Ok(_) => Some(()),
170                        Err(e) => {
171                            errors.push(e);
172                            None
173                        }
174                    })
175                    .collect();
176
177                if !errors.is_empty() && !error_reactants_clone.is_empty() {
178                    // Dispatch errors to error reactants
179                    let error_futures = errors.into_iter().flat_map(|err| {
180                        let err_arc = Arc::new(err);
181                        let p = payload_clone.clone();
182                        error_reactants_clone
183                            .iter()
184                            .map(move |er| er.react_error(err_arc.clone(), p.clone()))
185                    });
186                    join_all(error_futures).await;
187                } else if !errors.is_empty() {
188                    for e in errors {
189                        tracing::error!("Reactant error: {e}");
190                    }
191                }
192
193                Ok(successes)
194            }
195            .instrument(payload.span_debug("Dendrite::transduce")),
196        )
197    }
198}
199
200#[allow(clippy::type_complexity)]
201pub struct DendriteDecoder<T, C>
202where
203    C: Codec<T> + CodecName + Send + Sync + 'static,
204    T: Send + Sync + 'static,
205{
206    neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
207    reactants: Arc<RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>>,
208    raw_reactants: Arc<RwLock<Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>>>,
209    error_reactants: Arc<RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>>,
210    ingress_queue: RwLock<Option<Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>>>>,
211    _codec_marker: PhantomData<fn() -> &'static ()>,
212    _phantom_t: PhantomData<T>,
213}
214
215impl<T, C> DendriteDecoder<T, C>
216where
217    C: Codec<T> + CodecName + Send + Sync + 'static,
218    T: Send + Sync + 'static,
219{
220    #[must_use]
221    pub fn new(
222        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
223        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
224        raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
225        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
226        backpressure: Option<BackpressureConfig>,
227    ) -> Self {
228        let reactants = Arc::new(RwLock::new(reactants));
229        let raw_reactants = Arc::new(RwLock::new(raw_reactants));
230        let error_reactants = Arc::new(RwLock::new(error_reactants));
231        let neuron_clone = neuron.clone();
232        let reactants_clone = reactants.clone();
233        let raw_reactants_clone = raw_reactants.clone();
234        let error_reactants_clone = error_reactants.clone();
235
236        let ingress_queue = BackpressureQueue::new(
237            neuron.name(),
238            backpressure.unwrap_or_default(),
239            move |payload: Arc<PayloadRaw<T, C>>| {
240                Self::process_ingress(
241                    neuron_clone.clone(),
242                    reactants_clone.clone(),
243                    raw_reactants_clone.clone(),
244                    error_reactants_clone.clone(),
245                    payload,
246                )
247            },
248        );
249
250        Self {
251            neuron,
252            reactants,
253            raw_reactants,
254            error_reactants,
255            ingress_queue: RwLock::new(Some(Arc::new(ingress_queue))),
256            _codec_marker: PhantomData,
257            _phantom_t: PhantomData,
258        }
259    }
260
261    #[allow(clippy::type_complexity)]
262    async fn process_ingress(
263        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
264        rs: Arc<RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>>,
265        rrs: Arc<RwLock<Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>>>,
266        ers: Arc<RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>>,
267        payload: Arc<PayloadRaw<T, C>>,
268    ) {
269        let span = payload.span_debug("DendriteDecoder::process_ingress");
270        async move {
271            let decoded_value = match neuron.decode(&payload.value) {
272                Ok(value) => value,
273                Err(_) => return,
274            };
275
276            let decoded_payload = Arc::new(Payload::from_parts(
277                Arc::new(decoded_value),
278                neuron.clone(),
279                payload.trace,
280            ));
281
282            let reactants_vec: Vec<_> = {
283                let guard = rs.read().unwrap();
284                guard.iter().cloned().collect()
285            };
286
287            let raw_reactants_vec: Vec<_> = {
288                let guard = rrs.read().unwrap();
289                guard.iter().cloned().collect()
290            };
291
292            let error_reactants_vec: Vec<_> = {
293                let guard = ers.read().unwrap();
294                guard.iter().cloned().collect()
295            };
296
297            if reactants_vec.is_empty() && raw_reactants_vec.is_empty() {
298                return;
299            }
300
301            let decoded_futures = reactants_vec
302                .iter()
303                .map(|reactant| reactant.react(decoded_payload.clone()));
304
305            let raw_futures = raw_reactants_vec
306                .iter()
307                .map(|raw_reactant| raw_reactant.react(payload.clone()));
308
309            let (decoded_results, raw_results) =
310                futures_util::future::join(join_all(decoded_futures), join_all(raw_futures)).await;
311
312            let mut errors = Vec::new();
313            for res in decoded_results {
314                if let Err(e) = res {
315                    errors.push(e);
316                }
317            }
318            for res in raw_results {
319                if let Err(e) = res {
320                    errors.push(e);
321                }
322            }
323
324            if !errors.is_empty() && !error_reactants_vec.is_empty() {
325                let error_futures = errors.into_iter().flat_map(|err| {
326                    let err_arc = Arc::new(err);
327                    let p = decoded_payload.clone();
328                    error_reactants_vec
329                        .iter()
330                        .map(move |er| er.react_error(err_arc.clone(), p.clone()))
331                });
332                join_all(error_futures).await;
333            } else if !errors.is_empty() {
334                for e in errors {
335                    tracing::error!("Reactant error: {e}");
336                }
337            }
338        }
339        .instrument(span)
340        .await
341    }
342
343    /// Add reactants to the dendrite decoder after creation
344    pub fn add_reactants(
345        &self,
346        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
347    ) -> Result<(), DendriteError> {
348        if !reactants.is_empty() {
349            let mut write_guard =
350                self.reactants
351                    .write()
352                    .map_err(|_| DendriteError::ReactantsWriteLock {
353                        neuron_name: self.neuron.name(),
354                    })?;
355            write_guard.extend(reactants);
356        }
357        Ok(())
358    }
359
360    /// Add raw reactants to the dendrite decoder after creation
361    pub fn add_raw_reactants(
362        &self,
363        raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
364    ) -> Result<(), DendriteError> {
365        if !raw_reactants.is_empty() {
366            let mut write_guard =
367                self.raw_reactants
368                    .write()
369                    .map_err(|_| DendriteError::RawReactantsWriteLock {
370                        neuron_name: self.neuron.name(),
371                    })?;
372            write_guard.extend(raw_reactants);
373        }
374        Ok(())
375    }
376
377    /// Add error reactants to the dendrite decoder after creation
378    pub fn add_error_reactants(
379        &self,
380        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
381    ) -> Result<(), DendriteError> {
382        if !error_reactants.is_empty() {
383            let mut write_guard = self.error_reactants.write().map_err(|_| {
384                DendriteError::ErrorReactantsWriteLock {
385                    neuron_name: self.neuron.name(),
386                }
387            })?;
388            write_guard.extend(error_reactants);
389        }
390        Ok(())
391    }
392
393    #[allow(clippy::type_complexity)]
394    pub fn transduce(
395        &self,
396        payload: Arc<PayloadRaw<T, C>>,
397    ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), DendriteError>> + Send + 'static>>
398    {
399        let queue_lock = self.ingress_queue.read().unwrap();
400        let queue = match &*queue_lock {
401            Some(q) => q.clone(),
402            None => {
403                return Box::pin(async move { Ok((vec![], vec![])) });
404            }
405        };
406        drop(queue_lock);
407
408        Box::pin(async move {
409            queue.push(payload).await.map_err(|e| match e {
410                SynapseError::QueueFull { neuron_name } => {
411                    DendriteError::ReactantsWriteLock { neuron_name }
412                }
413                _ => DendriteError::ReactantsReadLock {
414                    neuron_name: "unknown".to_string(),
415                },
416            })?;
417            // BackpressureQueue handles the actual processing in background.
418            // For now, return empty vecs as the "acks" because processing is async.
419            Ok((vec![], vec![]))
420        })
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use crate::neuron::NeuronImpl;
428    use crate::test_utils::{
429        DebugCodec, DebugStruct, TokioMpscReactant, TokioMpscReactantRaw, test_namespace,
430    };
431    use std::thread;
432    use tokio::sync::mpsc::channel;
433    use uuid::Uuid;
434
435    #[tokio::test]
436    async fn test_dendrite_transduce() {
437        let ns = test_namespace();
438        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
439        let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
440            Arc::new(neuron_impl);
441
442        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
443
444        let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
445            vec![Arc::new(TokioMpscReactant { sender: tx })];
446        let dendrite = Dendrite::new(neuron_arc.clone(), reactants, vec![]);
447
448        let debug_struct_val = DebugStruct {
449            foo: 42,
450            bar: "test_value".to_owned(),
451        };
452        let uuid = Uuid::now_v7();
453        let _ = dendrite
454            .transduce(Payload::with_correlation(
455                debug_struct_val.clone(),
456                neuron_arc.clone(),
457                Some(uuid),
458            ))
459            .await;
460
461        assert_eq!(rx.len(), 1);
462        let p = rx.recv().await.unwrap();
463        assert_eq!(*p.value, debug_struct_val);
464        assert_eq!(p.correlation_id(), uuid);
465    }
466
467    #[tokio::test]
468    async fn test_dendrite_multiple_reactants() {
469        let ns = test_namespace();
470        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
471        let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
472            Arc::new(neuron_impl);
473
474        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
475        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
476
477        let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> = vec![
478            Arc::new(TokioMpscReactant { sender: tx1 }),
479            Arc::new(TokioMpscReactant { sender: tx2 }),
480        ];
481        let dendrite = Dendrite::new(neuron_arc.clone(), reactants, vec![]);
482
483        let debug_struct_val = DebugStruct {
484            foo: 100,
485            bar: "multi_test".to_owned(),
486        };
487        let uuid = Uuid::now_v7();
488        let payload_value = Arc::new(debug_struct_val.clone());
489
490        let _ = dendrite
491            .transduce(Payload::with_correlation(
492                debug_struct_val.clone(),
493                neuron_arc.clone(),
494                Some(uuid),
495            ))
496            .await;
497
498        assert_eq!(rx1.len(), 1);
499
500        let p1 = rx1.recv().await.unwrap();
501        assert_eq!(p1.value, payload_value);
502        assert_eq!(p1.correlation_id(), uuid);
503
504        assert_eq!(rx2.len(), 1);
505        let p2 = rx2.recv().await.unwrap();
506        assert_eq!(p2.value, payload_value);
507        assert_eq!(p2.correlation_id(), uuid);
508    }
509
510    #[tokio::test]
511    async fn test_decoder_dendrite_transduce() {
512        let ns = test_namespace();
513        let neuron_impl_for_encoding: NeuronImpl<DebugStruct, DebugCodec> =
514            NeuronImpl::new(ns.clone());
515        let neuron_arc_for_dendrite: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
516            Arc::new(NeuronImpl::new(ns.clone())); // Can be a different instance, or the same
517
518        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
519        let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
520
521        let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
522            vec![Arc::new(TokioMpscReactant { sender: tx })];
523        let raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
524            vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw })];
525
526        let dendrite_decoder = DendriteDecoder::new(
527            neuron_arc_for_dendrite.clone(),
528            reactants,
529            raw_reactants,
530            vec![],
531            None,
532        );
533
534        let uuid = Uuid::now_v7();
535        let debug_struct_val = DebugStruct {
536            foo: 49,
537            bar: "foo_bar".to_owned(),
538        };
539        let encoded = neuron_impl_for_encoding
540            .encode(&debug_struct_val)
541            .expect("Encoding should succeed in test");
542
543        let _ = dendrite_decoder
544            .transduce(PayloadRaw::with_correlation(
545                encoded.clone(),
546                Some(neuron_arc_for_dendrite.clone()),
547                Some(uuid),
548            ))
549            .await;
550
551        let p = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
552            .await
553            .expect("Timeout waiting for decoded message")
554            .expect("Channel closed");
555        assert_eq!(*p.value, debug_struct_val);
556        assert_eq!(p.correlation_id(), uuid);
557
558        let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
559            .await
560            .expect("Timeout waiting for raw message")
561            .expect("Channel closed");
562        assert_eq!(p2.value.as_slice(), encoded.as_slice());
563        assert_eq!(p2.correlation_id(), uuid);
564    }
565
566    #[tokio::test]
567    async fn test_dendrite_add_reactants() {
568        let ns = test_namespace();
569        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
570        let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
571            Arc::new(neuron_impl);
572
573        // Create initial reactant
574        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
575        let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
576            vec![Arc::new(TokioMpscReactant { sender: tx1 })];
577
578        // Create dendrite with initial reactant
579        let dendrite = Dendrite::new(neuron_arc.clone(), initial_reactants, vec![]);
580
581        // Create additional reactant to add later
582        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
583        let additional_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
584            vec![Arc::new(TokioMpscReactant { sender: tx2 })];
585
586        // Add the additional reactant
587        let _ = dendrite.add_reactants(additional_reactants);
588
589        // Create a payload to send
590        let debug_struct_val = DebugStruct {
591            foo: 42,
592            bar: "test_add_reactants".to_owned(),
593        };
594        let uuid = Uuid::now_v7();
595        let payload =
596            Payload::with_correlation(debug_struct_val.clone(), neuron_arc.clone(), Some(uuid));
597
598        // Transduce the payload
599        let _ = dendrite.transduce(payload.clone()).await;
600
601        // Both reactants should receive the payload
602        assert_eq!(rx1.len(), 1);
603        let p1 = rx1.recv().await.unwrap();
604        assert_eq!(*p1.value, debug_struct_val);
605        assert_eq!(p1.correlation_id(), uuid);
606
607        assert_eq!(rx2.len(), 1);
608        let p2 = rx2.recv().await.unwrap();
609        assert_eq!(*p2.value, debug_struct_val);
610        assert_eq!(p2.correlation_id(), uuid);
611    }
612
613    #[tokio::test]
614    async fn test_dendrite_decoder_add_reactants() {
615        let ns = test_namespace();
616        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
617        let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
618            Arc::new(neuron_impl);
619
620        // Create initial reactants
621        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
622        let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
623            vec![Arc::new(TokioMpscReactant { sender: tx1 })];
624
625        let (tx_raw1, mut rx_raw1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
626        let initial_raw_reactants: Vec<
627            Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>,
628        > = vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw1 })];
629
630        // Create dendrite decoder with initial reactants
631        let dendrite_decoder = DendriteDecoder::new(
632            neuron_arc.clone(),
633            initial_reactants,
634            initial_raw_reactants,
635            vec![],
636            None,
637        );
638
639        // Create additional reactants to add later
640        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
641        let additional_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
642            vec![Arc::new(TokioMpscReactant { sender: tx2 })];
643
644        let (tx_raw2, mut rx_raw2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
645        let additional_raw_reactants: Vec<
646            Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>,
647        > = vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw2 })];
648
649        // Add the additional reactants
650        let _ = dendrite_decoder.add_reactants(additional_reactants);
651        let _ = dendrite_decoder.add_raw_reactants(additional_raw_reactants);
652
653        // Create a payload to send
654        let debug_struct_val = DebugStruct {
655            foo: 42,
656            bar: "test_add_reactants_decoder".to_owned(),
657        };
658        let uuid = Uuid::now_v7();
659        let encoded = neuron_arc
660            .encode(&debug_struct_val)
661            .expect("Encoding should succeed in test");
662        let payload_raw =
663            PayloadRaw::with_correlation(encoded.clone(), Some(neuron_arc.clone()), Some(uuid));
664
665        // Transduce the payload
666        let _ = dendrite_decoder.transduce(payload_raw.clone()).await;
667
668        // All reactants should receive the payload
669        let p1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
670            .await
671            .expect("Timeout rx1")
672            .expect("Closed rx1");
673        assert_eq!(*p1.value, debug_struct_val);
674        assert_eq!(p1.correlation_id(), uuid);
675
676        let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
677            .await
678            .expect("Timeout rx2")
679            .expect("Closed rx2");
680        assert_eq!(*p2.value, debug_struct_val);
681        assert_eq!(p2.correlation_id(), uuid);
682
683        let p_raw1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw1.recv())
684            .await
685            .expect("Timeout raw_rx1")
686            .expect("Closed raw_rx1");
687        assert_eq!(p_raw1.value.as_slice(), encoded.as_slice());
688        assert_eq!(p_raw1.correlation_id(), uuid);
689
690        let p_raw2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw2.recv())
691            .await
692            .expect("Timeout raw_rx2")
693            .expect("Closed raw_rx2");
694        assert_eq!(p_raw2.value.as_slice(), encoded.as_slice());
695        assert_eq!(p_raw2.correlation_id(), uuid);
696    }
697
698    #[tokio::test]
699    async fn test_dendrite_concurrent_readers() {
700        let ns = test_namespace();
701        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
702        let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
703            Arc::new(neuron_impl);
704
705        // Create initial reactants
706        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
707        let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
708            vec![Arc::new(TokioMpscReactant { sender: tx1 })];
709
710        // Create dendrite with initial reactant
711        let dendrite = Arc::new(Dendrite::new(neuron_arc.clone(), initial_reactants, vec![]));
712
713        // Create a payload to send
714        let debug_struct_val = DebugStruct {
715            foo: 42,
716            bar: "test_concurrent_readers".to_owned(),
717        };
718        let uuid = Uuid::now_v7();
719        let payload =
720            Payload::with_correlation(debug_struct_val.clone(), neuron_arc.clone(), Some(uuid));
721
722        // Create multiple threads that read from the dendrite concurrently
723        let num_threads = 5;
724        let mut handles = vec![];
725
726        for _ in 0..num_threads {
727            let dendrite_clone = dendrite.clone();
728            let payload_clone = payload.clone();
729
730            let handle = thread::spawn(move || {
731                let rt = tokio::runtime::Runtime::new().unwrap();
732                rt.block_on(async {
733                    let _ = dendrite_clone.transduce(payload_clone).await;
734                });
735            });
736
737            handles.push(handle);
738        }
739
740        // Wait for all threads to complete
741        for handle in handles {
742            handle.join().unwrap();
743        }
744
745        // The reactant should have received the payload num_threads times
746        for _ in 0..num_threads {
747            let p = rx1.recv().await.unwrap();
748            assert_eq!(*p.value, debug_struct_val);
749            assert_eq!(p.correlation_id(), uuid);
750        }
751    }
752}