1use crate::backpressure::{BackpressureConfig, BackpressureQueue};
8use crate::codec::{Codec, CodecName};
9use crate::logging::LogTrace;
10use crate::neuron::Neuron;
11use crate::payload::{Payload, PayloadRaw};
12use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
13use crate::synapse::SynapseError;
14use futures_util::future::join_all;
15use std::future::Future;
16use std::marker::PhantomData;
17use std::pin::Pin;
18use std::sync::Arc;
19use parking_lot::RwLock;
20use thiserror::Error;
21use tracing::Instrument;
22
23#[derive(Error, Debug)]
24pub enum DendriteError {
25 #[error("Failed to acquire read lock on reactants for neuron '{neuron_name}'")]
26 ReactantsReadLock { neuron_name: String },
27 #[error("Failed to acquire write lock on reactants for neuron '{neuron_name}'")]
28 ReactantsWriteLock { neuron_name: String },
29 #[error("Failed to acquire read lock on raw reactants for neuron '{neuron_name}'")]
30 RawReactantsReadLock { neuron_name: String },
31 #[error("Failed to acquire write lock on raw reactants for neuron '{neuron_name}'")]
32 RawReactantsWriteLock { neuron_name: String },
33 #[error("Failed to acquire read lock on error reactants for neuron '{neuron_name}'")]
34 ErrorReactantsReadLock { neuron_name: String },
35 #[error("Failed to acquire write lock on error reactants for neuron '{neuron_name}'")]
36 ErrorReactantsWriteLock { neuron_name: String },
37 #[error("Internal error: {0}")]
38 Other(String),
39}
40
41pub struct Dendrite<T, C>
42where
43 C: Codec<T> + CodecName + Send + Sync + 'static,
44 T: Sync + Send + 'static,
45{
46 _neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
47 reactants: RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>,
48 error_reactants: RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>,
49 _codec_marker: PhantomData<fn() -> &'static ()>,
50 _phantom_t: PhantomData<T>,
51}
52
53impl<T, C> Dendrite<T, C>
54where
55 C: Codec<T> + CodecName + Send + Sync + 'static,
56 T: Sync + Send + 'static,
57{
58 #[must_use]
59 pub fn new(
60 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
61 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
62 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
63 ) -> Self {
64 Self {
65 _neuron: neuron,
66 reactants: RwLock::new(reactants),
67 error_reactants: RwLock::new(error_reactants),
68 _codec_marker: PhantomData,
69 _phantom_t: PhantomData,
70 }
71 }
72
73 pub fn add_reactants(
75 &self,
76 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
77 ) -> Result<(), DendriteError> {
78 if !reactants.is_empty() {
79 let mut write_guard = self.reactants.write();
80 write_guard.extend(reactants);
81 }
82 Ok(())
83 }
84
85 pub fn add_error_reactants(
87 &self,
88 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
89 ) -> Result<(), DendriteError> {
90 if !error_reactants.is_empty() {
91 let mut write_guard = self.error_reactants.write();
92 write_guard.extend(error_reactants);
93 }
94 Ok(())
95 }
96
97 pub fn transduce(
98 &self,
99 payload: Arc<Payload<T, C>>,
100 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, DendriteError>> + Send + 'static>> {
101 tracing::debug!("Dendrite::transduce called");
102
103 let reactants_clone = {
104 let guard = self.reactants.read();
105 if guard.is_empty() {
106 tracing::debug!("Dendrite::transduce no reactants, returning empty vec");
107 return Box::pin(async move { Ok(vec![]) });
108 }
109 guard.clone()
110 };
111
112 let error_reactants_clone = {
113 let guard = self.error_reactants.read();
114 guard.clone()
115 };
116
117 let payload_clone = payload.clone();
118 tracing::debug!(
119 "Dendrite::transduce - Cloned {} reactants",
120 reactants_clone.len()
121 );
122
123 Box::pin(
124 async move {
125 tracing::debug!(
126 "Dendrite::transduce creating futures for {} reactants",
127 reactants_clone.len()
128 );
129 let futures = reactants_clone
130 .iter()
131 .map(|reactant| reactant.react(payload_clone.clone()))
132 .collect::<Vec<_>>();
133
134 tracing::debug!(
135 "Dendrite::transduce awaiting join_all of {} futures",
136 futures.len()
137 );
138 let results = join_all(futures).await;
139 tracing::debug!("Dendrite::transduce join_all completed");
140
141 let mut errors = Vec::new();
142 let successes: Vec<()> = results
143 .into_iter()
144 .filter_map(|r| match r {
145 Ok(_) => Some(()),
146 Err(e) => {
147 errors.push(e);
148 None
149 }
150 })
151 .collect();
152
153 if !errors.is_empty() && !error_reactants_clone.is_empty() {
154 let error_futures = errors.into_iter().flat_map(|err| {
156 let err_arc = Arc::new(err);
157 let p = payload_clone.clone();
158 error_reactants_clone
159 .iter()
160 .map(move |er| er.react_error(err_arc.clone(), p.clone()))
161 });
162 join_all(error_futures).await;
163 } else if !errors.is_empty() {
164 for e in errors {
165 tracing::error!("Reactant error: {e}");
166 }
167 }
168
169 Ok(successes)
170 }
171 .instrument(payload.span_debug("Dendrite::transduce")),
172 )
173 }
174}
175
176#[allow(clippy::type_complexity)]
177pub struct DendriteDecoder<T, C>
178where
179 C: Codec<T> + CodecName + Send + Sync + 'static,
180 T: Send + Sync + 'static,
181{
182 reactants: Arc<RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>>,
183 raw_reactants: Arc<RwLock<Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>>>,
184 error_reactants: Arc<RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>>,
185 ingress_queue: RwLock<Option<Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>>>>,
186 _codec_marker: PhantomData<fn() -> &'static ()>,
187 _phantom_t: PhantomData<T>,
188}
189
190impl<T, C> DendriteDecoder<T, C>
191where
192 C: Codec<T> + CodecName + Send + Sync + 'static,
193 T: Send + Sync + 'static,
194{
195 #[must_use]
196 pub fn new(
197 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
198 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
199 raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
200 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
201 backpressure: Option<BackpressureConfig>,
202 ) -> Self {
203 let reactants = Arc::new(RwLock::new(reactants));
204 let raw_reactants = Arc::new(RwLock::new(raw_reactants));
205 let error_reactants = Arc::new(RwLock::new(error_reactants));
206 let reactants_clone = reactants.clone();
207 let raw_reactants_clone = raw_reactants.clone();
208 let error_reactants_clone = error_reactants.clone();
209
210 let ingress_queue = BackpressureQueue::new(
211 neuron.name(),
212 backpressure.unwrap_or_default(),
213 move |payload: Arc<PayloadRaw<T, C>>| {
214 Self::process_ingress(
215 reactants_clone.clone(),
216 raw_reactants_clone.clone(),
217 error_reactants_clone.clone(),
218 payload,
219 )
220 },
221 );
222
223 Self {
224 reactants,
225 raw_reactants,
226 error_reactants,
227 ingress_queue: RwLock::new(Some(Arc::new(ingress_queue))),
228 _codec_marker: PhantomData,
229 _phantom_t: PhantomData,
230 }
231 }
232
233 #[allow(clippy::type_complexity)]
234 async fn process_ingress(
235 rs: Arc<RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>>,
236 rrs: Arc<RwLock<Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>>>,
237 ers: Arc<RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>>,
238 payload: Arc<PayloadRaw<T, C>>,
239 ) {
240 let span = payload.span_debug("DendriteDecoder::process_ingress");
241 async move {
242 let neuron = payload.neuron.clone();
243 let decoded_value = match neuron.decode(&payload.value) {
244 Ok(value) => value,
245 Err(_) => return,
246 };
247
248 let decoded_payload = Arc::new(Payload::from_parts(
249 Arc::new(decoded_value),
250 neuron.clone(),
251 payload.trace,
252 ));
253
254 let reactants_vec: Vec<_> = {
255 let guard = rs.read();
256 guard.iter().cloned().collect()
257 };
258
259 let raw_reactants_vec: Vec<_> = {
260 let guard = rrs.read();
261 guard.iter().cloned().collect()
262 };
263
264 let error_reactants_vec: Vec<_> = {
265 let guard = ers.read();
266 guard.iter().cloned().collect()
267 };
268
269 if reactants_vec.is_empty() && raw_reactants_vec.is_empty() {
270 return;
271 }
272
273 let decoded_futures = reactants_vec
274 .iter()
275 .map(|reactant| reactant.react(decoded_payload.clone()));
276
277 let raw_futures = raw_reactants_vec
278 .iter()
279 .map(|raw_reactant| raw_reactant.react(payload.clone()));
280
281 let (decoded_results, raw_results) =
282 futures_util::future::join(join_all(decoded_futures), join_all(raw_futures)).await;
283
284 let mut errors = Vec::new();
285 for res in decoded_results {
286 if let Err(e) = res {
287 errors.push(e);
288 }
289 }
290 for res in raw_results {
291 if let Err(e) = res {
292 errors.push(e);
293 }
294 }
295
296 if !errors.is_empty() && !error_reactants_vec.is_empty() {
297 let error_futures = errors.into_iter().flat_map(|err| {
298 let err_arc = Arc::new(err);
299 let p = decoded_payload.clone();
300 error_reactants_vec
301 .iter()
302 .map(move |er| er.react_error(err_arc.clone(), p.clone()))
303 });
304 join_all(error_futures).await;
305 } else if !errors.is_empty() {
306 for e in errors {
307 tracing::error!("Reactant error: {e}");
308 }
309 }
310 }
311 .instrument(span)
312 .await
313 }
314
315 pub fn add_reactants(
317 &self,
318 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
319 ) -> Result<(), DendriteError> {
320 if !reactants.is_empty() {
321 let mut write_guard = self.reactants.write();
322 write_guard.extend(reactants);
323 }
324 Ok(())
325 }
326
327 pub fn add_raw_reactants(
329 &self,
330 raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
331 ) -> Result<(), DendriteError> {
332 if !raw_reactants.is_empty() {
333 let mut write_guard = self.raw_reactants.write();
334 write_guard.extend(raw_reactants);
335 }
336 Ok(())
337 }
338
339 pub fn add_error_reactants(
341 &self,
342 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
343 ) -> Result<(), DendriteError> {
344 if !error_reactants.is_empty() {
345 let mut write_guard = self.error_reactants.write();
346 write_guard.extend(error_reactants);
347 }
348 Ok(())
349 }
350
351 #[allow(clippy::type_complexity)]
352 pub fn transduce(
353 &self,
354 payload: Arc<PayloadRaw<T, C>>,
355 ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), DendriteError>> + Send + 'static>>
356 {
357 let queue_lock = self.ingress_queue.read();
358 let queue: Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>> = match &*queue_lock {
359 Some(q) => q.clone(),
360 None => {
361 return Box::pin(async move { Ok((vec![], vec![])) });
362 }
363 };
364 drop(queue_lock);
365
366 Box::pin(async move {
367 queue.push(payload).await.map_err(|e| match e {
368 SynapseError::QueueFull { neuron_name } => {
369 DendriteError::ReactantsWriteLock { neuron_name }
370 }
371 _ => DendriteError::ReactantsReadLock {
372 neuron_name: "unknown".to_string(),
373 },
374 })?;
375 Ok((vec![], vec![]))
378 })
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use crate::neuron::NeuronImpl;
386 use crate::test_utils::{
387 DebugCodec, DebugStruct, TokioMpscReactant, TokioMpscReactantRaw, test_namespace,
388 };
389 use std::thread;
390 use tokio::sync::mpsc::channel;
391 use uuid::Uuid;
392
393 #[tokio::test]
394 async fn test_dendrite_transduce() {
395 let ns = test_namespace();
396 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
397 let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
398 Arc::new(neuron_impl);
399
400 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
401
402 let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
403 vec![Arc::new(TokioMpscReactant { sender: tx })];
404 let dendrite = Dendrite::new(neuron_arc.clone(), reactants, vec![]);
405
406 let debug_struct_val = DebugStruct {
407 foo: 42,
408 bar: "test_value".to_owned(),
409 };
410 let uuid = Uuid::now_v7();
411 let _ = dendrite
412 .transduce(Payload::with_correlation(
413 debug_struct_val.clone(),
414 neuron_arc.clone(),
415 Some(uuid),
416 ))
417 .await;
418
419 assert_eq!(rx.len(), 1);
420 let p = rx.recv().await.unwrap();
421 assert_eq!(*p.value, debug_struct_val);
422 assert_eq!(p.correlation_id(), uuid);
423 }
424
425 #[tokio::test]
426 async fn test_dendrite_multiple_reactants() {
427 let ns = test_namespace();
428 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
429 let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
430 Arc::new(neuron_impl);
431
432 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
433 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
434
435 let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> = vec![
436 Arc::new(TokioMpscReactant { sender: tx1 }),
437 Arc::new(TokioMpscReactant { sender: tx2 }),
438 ];
439 let dendrite = Dendrite::new(neuron_arc.clone(), reactants, vec![]);
440
441 let debug_struct_val = DebugStruct {
442 foo: 100,
443 bar: "multi_test".to_owned(),
444 };
445 let uuid = Uuid::now_v7();
446 let payload_value = Arc::new(debug_struct_val.clone());
447
448 let _ = dendrite
449 .transduce(Payload::with_correlation(
450 debug_struct_val.clone(),
451 neuron_arc.clone(),
452 Some(uuid),
453 ))
454 .await;
455
456 assert_eq!(rx1.len(), 1);
457
458 let p1 = rx1.recv().await.unwrap();
459 assert_eq!(p1.value, payload_value);
460 assert_eq!(p1.correlation_id(), uuid);
461
462 assert_eq!(rx2.len(), 1);
463 let p2 = rx2.recv().await.unwrap();
464 assert_eq!(p2.value, payload_value);
465 assert_eq!(p2.correlation_id(), uuid);
466 }
467
468 #[tokio::test]
469 async fn test_decoder_dendrite_transduce() {
470 let ns = test_namespace();
471 let neuron_impl_for_encoding: NeuronImpl<DebugStruct, DebugCodec> =
472 NeuronImpl::new(ns.clone());
473 let neuron_arc_for_dendrite: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
474 Arc::new(NeuronImpl::new(ns.clone())); let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
477 let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
478
479 let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
480 vec![Arc::new(TokioMpscReactant { sender: tx })];
481 let raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
482 vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw })];
483
484 let dendrite_decoder = DendriteDecoder::new(
485 neuron_arc_for_dendrite.clone(),
486 reactants,
487 raw_reactants,
488 vec![],
489 None,
490 );
491
492 let uuid = Uuid::now_v7();
493 let debug_struct_val = DebugStruct {
494 foo: 49,
495 bar: "foo_bar".to_owned(),
496 };
497 let encoded = neuron_impl_for_encoding
498 .encode(&debug_struct_val)
499 .expect("Encoding should succeed in test");
500
501 let _ = dendrite_decoder
502 .transduce(PayloadRaw::with_correlation(
503 encoded.clone(),
504 neuron_arc_for_dendrite.clone(),
505 Some(uuid),
506 ))
507 .await;
508
509 let p = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
510 .await
511 .expect("Timeout waiting for decoded message")
512 .expect("Channel closed");
513 assert_eq!(*p.value, debug_struct_val);
514 assert_eq!(p.correlation_id(), uuid);
515
516 let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
517 .await
518 .expect("Timeout waiting for raw message")
519 .expect("Channel closed");
520 assert_eq!(p2.value.as_slice(), encoded.as_slice());
521 assert_eq!(p2.correlation_id(), uuid);
522 }
523
524 #[tokio::test]
525 async fn test_dendrite_add_reactants() {
526 let ns = test_namespace();
527 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
528 let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
529 Arc::new(neuron_impl);
530
531 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
533 let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
534 vec![Arc::new(TokioMpscReactant { sender: tx1 })];
535
536 let dendrite = Dendrite::new(neuron_arc.clone(), initial_reactants, vec![]);
538
539 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
541 let additional_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
542 vec![Arc::new(TokioMpscReactant { sender: tx2 })];
543
544 let _ = dendrite.add_reactants(additional_reactants);
546
547 let debug_struct_val = DebugStruct {
549 foo: 42,
550 bar: "test_add_reactants".to_owned(),
551 };
552 let uuid = Uuid::now_v7();
553 let payload =
554 Payload::with_correlation(debug_struct_val.clone(), neuron_arc.clone(), Some(uuid));
555
556 let _ = dendrite.transduce(payload.clone()).await;
558
559 assert_eq!(rx1.len(), 1);
561 let p1 = rx1.recv().await.unwrap();
562 assert_eq!(*p1.value, debug_struct_val);
563 assert_eq!(p1.correlation_id(), uuid);
564
565 assert_eq!(rx2.len(), 1);
566 let p2 = rx2.recv().await.unwrap();
567 assert_eq!(*p2.value, debug_struct_val);
568 assert_eq!(p2.correlation_id(), uuid);
569 }
570
571 #[tokio::test]
572 async fn test_dendrite_decoder_add_reactants() {
573 let ns = test_namespace();
574 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
575 let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
576 Arc::new(neuron_impl);
577
578 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
580 let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
581 vec![Arc::new(TokioMpscReactant { sender: tx1 })];
582
583 let (tx_raw1, mut rx_raw1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
584 let initial_raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
585 vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw1 })];
586
587 let dendrite_decoder = DendriteDecoder::new(
589 neuron_arc.clone(),
590 initial_reactants,
591 initial_raw_reactants,
592 vec![],
593 None,
594 );
595
596 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
598 let additional_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
599 vec![Arc::new(TokioMpscReactant { sender: tx2 })];
600
601 let (tx_raw2, mut rx_raw2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
602 let additional_raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
603 vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw2 })];
604
605 let _ = dendrite_decoder.add_reactants(additional_reactants);
607 let _ = dendrite_decoder.add_raw_reactants(additional_raw_reactants);
608
609 let debug_struct_val = DebugStruct {
611 foo: 42,
612 bar: "test_add_reactants_decoder".to_owned(),
613 };
614 let uuid = Uuid::now_v7();
615 let encoded = neuron_arc
616 .encode(&debug_struct_val)
617 .expect("Encoding should succeed in test");
618 let payload_raw =
619 PayloadRaw::with_correlation(encoded.clone(), neuron_arc.clone(), Some(uuid));
620
621 let _ = dendrite_decoder.transduce(payload_raw.clone()).await;
623
624 let p1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
626 .await
627 .expect("Timeout rx1")
628 .expect("Closed rx1");
629 assert_eq!(*p1.value, debug_struct_val);
630 assert_eq!(p1.correlation_id(), uuid);
631
632 let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
633 .await
634 .expect("Timeout rx2")
635 .expect("Closed rx2");
636 assert_eq!(*p2.value, debug_struct_val);
637 assert_eq!(p2.correlation_id(), uuid);
638
639 let p_raw1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw1.recv())
640 .await
641 .expect("Timeout raw_rx1")
642 .expect("Closed raw_rx1");
643 assert_eq!(p_raw1.value.as_slice(), encoded.as_slice());
644 assert_eq!(p_raw1.correlation_id(), uuid);
645
646 let p_raw2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw2.recv())
647 .await
648 .expect("Timeout raw_rx2")
649 .expect("Closed raw_rx2");
650 assert_eq!(p_raw2.value.as_slice(), encoded.as_slice());
651 assert_eq!(p_raw2.correlation_id(), uuid);
652 }
653
654 #[tokio::test]
655 async fn test_dendrite_concurrent_readers() {
656 let ns = test_namespace();
657 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
658 let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
659 Arc::new(neuron_impl);
660
661 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
663 let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
664 vec![Arc::new(TokioMpscReactant { sender: tx1 })];
665
666 let dendrite = Arc::new(Dendrite::new(neuron_arc.clone(), initial_reactants, vec![]));
668
669 let debug_struct_val = DebugStruct {
671 foo: 42,
672 bar: "test_concurrent_readers".to_owned(),
673 };
674 let uuid = Uuid::now_v7();
675 let payload =
676 Payload::with_correlation(debug_struct_val.clone(), neuron_arc.clone(), Some(uuid));
677
678 let num_threads = 5;
680 let mut handles = vec![];
681
682 for _ in 0..num_threads {
683 let dendrite_clone = dendrite.clone();
684 let payload_clone = payload.clone();
685
686 let handle = thread::spawn(move || {
687 let rt = tokio::runtime::Runtime::new().unwrap();
688 rt.block_on(async {
689 let _ = dendrite_clone.transduce(payload_clone).await;
690 });
691 });
692
693 handles.push(handle);
694 }
695
696 for handle in handles {
698 handle.join().unwrap();
699 }
700
701 for _ in 0..num_threads {
703 let p = rx1.recv().await.unwrap();
704 assert_eq!(*p.value, debug_struct_val);
705 assert_eq!(p.correlation_id(), uuid);
706 }
707 }
708}