1use crate::codec::{Codec, CodecName};
8use crate::logging::LogTrace;
9use crate::neuron::Neuron;
10use crate::payload::{Payload, PayloadRaw};
11use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
12use crate::synapse::{BackpressureConfig, BackpressureQueue, SynapseError};
13use futures_util::future::join_all;
14use std::future::Future;
15use std::marker::PhantomData;
16use std::pin::Pin;
17use std::sync::{Arc, RwLock};
18use thiserror::Error;
19use tracing::Instrument;
20
21#[derive(Error, Debug)]
22pub enum DendriteError {
23 #[error("Failed to acquire read lock on reactants for neuron '{neuron_name}'")]
24 ReactantsReadLock { neuron_name: String },
25 #[error("Failed to acquire write lock on reactants for neuron '{neuron_name}'")]
26 ReactantsWriteLock { neuron_name: String },
27 #[error("Failed to acquire read lock on raw reactants for neuron '{neuron_name}'")]
28 RawReactantsReadLock { neuron_name: String },
29 #[error("Failed to acquire write lock on raw reactants for neuron '{neuron_name}'")]
30 RawReactantsWriteLock { neuron_name: String },
31 #[error("Failed to acquire read lock on error reactants for neuron '{neuron_name}'")]
32 ErrorReactantsReadLock { neuron_name: String },
33 #[error("Failed to acquire write lock on error reactants for neuron '{neuron_name}'")]
34 ErrorReactantsWriteLock { neuron_name: String },
35 #[error("Internal error: {0}")]
36 Other(String),
37}
38
39pub struct Dendrite<T, C>
40where
41 C: Codec<T> + CodecName + Send + Sync + 'static,
42 T: Sync + Send + 'static,
43{
44 _neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
45 reactants: RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>,
46 error_reactants: RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>,
47 _codec_marker: PhantomData<fn() -> &'static ()>,
48 _phantom_t: PhantomData<T>,
49}
50
51impl<T, C> Dendrite<T, C>
52where
53 C: Codec<T> + CodecName + Send + Sync + 'static,
54 T: Sync + Send + 'static,
55{
56 #[must_use]
57 pub fn new(
58 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
59 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
60 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
61 ) -> Self {
62 Self {
63 _neuron: neuron,
64 reactants: RwLock::new(reactants),
65 error_reactants: RwLock::new(error_reactants),
66 _codec_marker: PhantomData,
67 _phantom_t: PhantomData,
68 }
69 }
70
71 pub fn add_reactants(
73 &self,
74 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
75 ) -> Result<(), DendriteError> {
76 if !reactants.is_empty() {
77 let mut write_guard =
78 self.reactants
79 .write()
80 .map_err(|_| DendriteError::ReactantsWriteLock {
81 neuron_name: self._neuron.name(),
82 })?;
83 write_guard.extend(reactants);
84 }
85 Ok(())
86 }
87
88 pub fn add_error_reactants(
90 &self,
91 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
92 ) -> Result<(), DendriteError> {
93 if !error_reactants.is_empty() {
94 let mut write_guard = self.error_reactants.write().map_err(|_| {
95 DendriteError::ErrorReactantsWriteLock {
96 neuron_name: self._neuron.name(),
97 }
98 })?;
99 write_guard.extend(error_reactants);
100 }
101 Ok(())
102 }
103
104 pub fn transduce(
105 &self,
106 payload: Arc<Payload<T, C>>,
107 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, DendriteError>> + Send + 'static>> {
108 tracing::debug!("Dendrite::transduce called");
109
110 let neuron_name = self._neuron.name();
111 let reactants_result =
112 self.reactants
113 .read()
114 .map_err(|_| DendriteError::ReactantsReadLock {
115 neuron_name: neuron_name.clone(),
116 });
117 let reactants_clone = match reactants_result {
118 Ok(guard) => {
119 if guard.is_empty() {
120 tracing::debug!("Dendrite::transduce no reactants, returning empty vec");
121 return Box::pin(async move { Ok(vec![]) });
122 }
123 guard.clone()
124 }
125 Err(e) => return Box::pin(async move { Err(e) }),
126 };
127
128 let error_reactants_result =
130 self.error_reactants
131 .read()
132 .map_err(|_| DendriteError::ReactantsReadLock {
133 neuron_name: neuron_name.clone(),
135 });
136 let error_reactants_clone = match error_reactants_result {
137 Ok(guard) => guard.clone(),
138 Err(_) => Vec::new(), };
140
141 let payload_clone = payload.clone();
142 tracing::debug!(
143 "Dendrite::transduce - Cloned {} reactants",
144 reactants_clone.len()
145 );
146
147 Box::pin(
148 async move {
149 tracing::debug!(
150 "Dendrite::transduce creating futures for {} reactants",
151 reactants_clone.len()
152 );
153 let futures = reactants_clone
154 .iter()
155 .map(|reactant| reactant.react(payload_clone.clone()))
156 .collect::<Vec<_>>();
157
158 tracing::debug!(
159 "Dendrite::transduce awaiting join_all of {} futures",
160 futures.len()
161 );
162 let results = join_all(futures).await;
163 tracing::debug!("Dendrite::transduce join_all completed");
164
165 let mut errors = Vec::new();
166 let successes: Vec<()> = results
167 .into_iter()
168 .filter_map(|r| match r {
169 Ok(_) => Some(()),
170 Err(e) => {
171 errors.push(e);
172 None
173 }
174 })
175 .collect();
176
177 if !errors.is_empty() && !error_reactants_clone.is_empty() {
178 let error_futures = errors.into_iter().flat_map(|err| {
180 let err_arc = Arc::new(err);
181 let p = payload_clone.clone();
182 error_reactants_clone
183 .iter()
184 .map(move |er| er.react_error(err_arc.clone(), p.clone()))
185 });
186 join_all(error_futures).await;
187 } else if !errors.is_empty() {
188 for e in errors {
189 tracing::error!("Reactant error: {e}");
190 }
191 }
192
193 Ok(successes)
194 }
195 .instrument(payload.span_debug("Dendrite::transduce")),
196 )
197 }
198}
199
200#[allow(clippy::type_complexity)]
201pub struct DendriteDecoder<T, C>
202where
203 C: Codec<T> + CodecName + Send + Sync + 'static,
204 T: Send + Sync + 'static,
205{
206 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
207 reactants: Arc<RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>>,
208 raw_reactants: Arc<RwLock<Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>>>,
209 error_reactants: Arc<RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>>,
210 ingress_queue: RwLock<Option<Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>>>>,
211 _codec_marker: PhantomData<fn() -> &'static ()>,
212 _phantom_t: PhantomData<T>,
213}
214
215impl<T, C> DendriteDecoder<T, C>
216where
217 C: Codec<T> + CodecName + Send + Sync + 'static,
218 T: Send + Sync + 'static,
219{
220 #[must_use]
221 pub fn new(
222 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
223 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
224 raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
225 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
226 backpressure: Option<BackpressureConfig>,
227 ) -> Self {
228 let reactants = Arc::new(RwLock::new(reactants));
229 let raw_reactants = Arc::new(RwLock::new(raw_reactants));
230 let error_reactants = Arc::new(RwLock::new(error_reactants));
231 let neuron_clone = neuron.clone();
232 let reactants_clone = reactants.clone();
233 let raw_reactants_clone = raw_reactants.clone();
234 let error_reactants_clone = error_reactants.clone();
235
236 let ingress_queue = BackpressureQueue::new(
237 neuron.name(),
238 backpressure.unwrap_or_default(),
239 move |payload: Arc<PayloadRaw<T, C>>| {
240 Self::process_ingress(
241 neuron_clone.clone(),
242 reactants_clone.clone(),
243 raw_reactants_clone.clone(),
244 error_reactants_clone.clone(),
245 payload,
246 )
247 },
248 );
249
250 Self {
251 neuron,
252 reactants,
253 raw_reactants,
254 error_reactants,
255 ingress_queue: RwLock::new(Some(Arc::new(ingress_queue))),
256 _codec_marker: PhantomData,
257 _phantom_t: PhantomData,
258 }
259 }
260
261 #[allow(clippy::type_complexity)]
262 async fn process_ingress(
263 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
264 rs: Arc<RwLock<Vec<Arc<dyn Reactant<T, C> + Send + Sync>>>>,
265 rrs: Arc<RwLock<Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>>>,
266 ers: Arc<RwLock<Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>>>,
267 payload: Arc<PayloadRaw<T, C>>,
268 ) {
269 let span = payload.span_debug("DendriteDecoder::process_ingress");
270 async move {
271 let decoded_value = match neuron.decode(&payload.value) {
272 Ok(value) => value,
273 Err(_) => return,
274 };
275
276 let decoded_payload = Arc::new(Payload::from_parts(
277 Arc::new(decoded_value),
278 neuron.clone(),
279 payload.trace,
280 ));
281
282 let reactants_vec: Vec<_> = {
283 let guard = rs.read().unwrap();
284 guard.iter().cloned().collect()
285 };
286
287 let raw_reactants_vec: Vec<_> = {
288 let guard = rrs.read().unwrap();
289 guard.iter().cloned().collect()
290 };
291
292 let error_reactants_vec: Vec<_> = {
293 let guard = ers.read().unwrap();
294 guard.iter().cloned().collect()
295 };
296
297 if reactants_vec.is_empty() && raw_reactants_vec.is_empty() {
298 return;
299 }
300
301 let decoded_futures = reactants_vec
302 .iter()
303 .map(|reactant| reactant.react(decoded_payload.clone()));
304
305 let raw_futures = raw_reactants_vec
306 .iter()
307 .map(|raw_reactant| raw_reactant.react(payload.clone()));
308
309 let (decoded_results, raw_results) =
310 futures_util::future::join(join_all(decoded_futures), join_all(raw_futures)).await;
311
312 let mut errors = Vec::new();
313 for res in decoded_results {
314 if let Err(e) = res {
315 errors.push(e);
316 }
317 }
318 for res in raw_results {
319 if let Err(e) = res {
320 errors.push(e);
321 }
322 }
323
324 if !errors.is_empty() && !error_reactants_vec.is_empty() {
325 let error_futures = errors.into_iter().flat_map(|err| {
326 let err_arc = Arc::new(err);
327 let p = decoded_payload.clone();
328 error_reactants_vec
329 .iter()
330 .map(move |er| er.react_error(err_arc.clone(), p.clone()))
331 });
332 join_all(error_futures).await;
333 } else if !errors.is_empty() {
334 for e in errors {
335 tracing::error!("Reactant error: {e}");
336 }
337 }
338 }
339 .instrument(span)
340 .await
341 }
342
343 pub fn add_reactants(
345 &self,
346 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
347 ) -> Result<(), DendriteError> {
348 if !reactants.is_empty() {
349 let mut write_guard =
350 self.reactants
351 .write()
352 .map_err(|_| DendriteError::ReactantsWriteLock {
353 neuron_name: self.neuron.name(),
354 })?;
355 write_guard.extend(reactants);
356 }
357 Ok(())
358 }
359
360 pub fn add_raw_reactants(
362 &self,
363 raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
364 ) -> Result<(), DendriteError> {
365 if !raw_reactants.is_empty() {
366 let mut write_guard =
367 self.raw_reactants
368 .write()
369 .map_err(|_| DendriteError::RawReactantsWriteLock {
370 neuron_name: self.neuron.name(),
371 })?;
372 write_guard.extend(raw_reactants);
373 }
374 Ok(())
375 }
376
377 pub fn add_error_reactants(
379 &self,
380 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
381 ) -> Result<(), DendriteError> {
382 if !error_reactants.is_empty() {
383 let mut write_guard = self.error_reactants.write().map_err(|_| {
384 DendriteError::ErrorReactantsWriteLock {
385 neuron_name: self.neuron.name(),
386 }
387 })?;
388 write_guard.extend(error_reactants);
389 }
390 Ok(())
391 }
392
393 #[allow(clippy::type_complexity)]
394 pub fn transduce(
395 &self,
396 payload: Arc<PayloadRaw<T, C>>,
397 ) -> Pin<Box<dyn Future<Output = Result<(Vec<()>, Vec<()>), DendriteError>> + Send + 'static>>
398 {
399 let queue_lock = self.ingress_queue.read().unwrap();
400 let queue = match &*queue_lock {
401 Some(q) => q.clone(),
402 None => {
403 return Box::pin(async move { Ok((vec![], vec![])) });
404 }
405 };
406 drop(queue_lock);
407
408 Box::pin(async move {
409 queue.push(payload).await.map_err(|e| match e {
410 SynapseError::QueueFull { neuron_name } => {
411 DendriteError::ReactantsWriteLock { neuron_name }
412 }
413 _ => DendriteError::ReactantsReadLock {
414 neuron_name: "unknown".to_string(),
415 },
416 })?;
417 Ok((vec![], vec![]))
420 })
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::neuron::NeuronImpl;
428 use crate::test_utils::{
429 DebugCodec, DebugStruct, TokioMpscReactant, TokioMpscReactantRaw, test_namespace,
430 };
431 use std::thread;
432 use tokio::sync::mpsc::channel;
433 use uuid::Uuid;
434
435 #[tokio::test]
436 async fn test_dendrite_transduce() {
437 let ns = test_namespace();
438 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
439 let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
440 Arc::new(neuron_impl);
441
442 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
443
444 let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
445 vec![Arc::new(TokioMpscReactant { sender: tx })];
446 let dendrite = Dendrite::new(neuron_arc.clone(), reactants, vec![]);
447
448 let debug_struct_val = DebugStruct {
449 foo: 42,
450 bar: "test_value".to_owned(),
451 };
452 let uuid = Uuid::now_v7();
453 let _ = dendrite
454 .transduce(Payload::with_correlation(
455 debug_struct_val.clone(),
456 neuron_arc.clone(),
457 Some(uuid),
458 ))
459 .await;
460
461 assert_eq!(rx.len(), 1);
462 let p = rx.recv().await.unwrap();
463 assert_eq!(*p.value, debug_struct_val);
464 assert_eq!(p.correlation_id(), uuid);
465 }
466
467 #[tokio::test]
468 async fn test_dendrite_multiple_reactants() {
469 let ns = test_namespace();
470 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
471 let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
472 Arc::new(neuron_impl);
473
474 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
475 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
476
477 let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> = vec![
478 Arc::new(TokioMpscReactant { sender: tx1 }),
479 Arc::new(TokioMpscReactant { sender: tx2 }),
480 ];
481 let dendrite = Dendrite::new(neuron_arc.clone(), reactants, vec![]);
482
483 let debug_struct_val = DebugStruct {
484 foo: 100,
485 bar: "multi_test".to_owned(),
486 };
487 let uuid = Uuid::now_v7();
488 let payload_value = Arc::new(debug_struct_val.clone());
489
490 let _ = dendrite
491 .transduce(Payload::with_correlation(
492 debug_struct_val.clone(),
493 neuron_arc.clone(),
494 Some(uuid),
495 ))
496 .await;
497
498 assert_eq!(rx1.len(), 1);
499
500 let p1 = rx1.recv().await.unwrap();
501 assert_eq!(p1.value, payload_value);
502 assert_eq!(p1.correlation_id(), uuid);
503
504 assert_eq!(rx2.len(), 1);
505 let p2 = rx2.recv().await.unwrap();
506 assert_eq!(p2.value, payload_value);
507 assert_eq!(p2.correlation_id(), uuid);
508 }
509
510 #[tokio::test]
511 async fn test_decoder_dendrite_transduce() {
512 let ns = test_namespace();
513 let neuron_impl_for_encoding: NeuronImpl<DebugStruct, DebugCodec> =
514 NeuronImpl::new(ns.clone());
515 let neuron_arc_for_dendrite: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
516 Arc::new(NeuronImpl::new(ns.clone())); let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
519 let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
520
521 let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
522 vec![Arc::new(TokioMpscReactant { sender: tx })];
523 let raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
524 vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw })];
525
526 let dendrite_decoder = DendriteDecoder::new(
527 neuron_arc_for_dendrite.clone(),
528 reactants,
529 raw_reactants,
530 vec![],
531 None,
532 );
533
534 let uuid = Uuid::now_v7();
535 let debug_struct_val = DebugStruct {
536 foo: 49,
537 bar: "foo_bar".to_owned(),
538 };
539 let encoded = neuron_impl_for_encoding
540 .encode(&debug_struct_val)
541 .expect("Encoding should succeed in test");
542
543 let _ = dendrite_decoder
544 .transduce(PayloadRaw::with_correlation(
545 encoded.clone(),
546 Some(neuron_arc_for_dendrite.clone()),
547 Some(uuid),
548 ))
549 .await;
550
551 let p = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
552 .await
553 .expect("Timeout waiting for decoded message")
554 .expect("Channel closed");
555 assert_eq!(*p.value, debug_struct_val);
556 assert_eq!(p.correlation_id(), uuid);
557
558 let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
559 .await
560 .expect("Timeout waiting for raw message")
561 .expect("Channel closed");
562 assert_eq!(p2.value.as_slice(), encoded.as_slice());
563 assert_eq!(p2.correlation_id(), uuid);
564 }
565
566 #[tokio::test]
567 async fn test_dendrite_add_reactants() {
568 let ns = test_namespace();
569 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
570 let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
571 Arc::new(neuron_impl);
572
573 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
575 let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
576 vec![Arc::new(TokioMpscReactant { sender: tx1 })];
577
578 let dendrite = Dendrite::new(neuron_arc.clone(), initial_reactants, vec![]);
580
581 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
583 let additional_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
584 vec![Arc::new(TokioMpscReactant { sender: tx2 })];
585
586 let _ = dendrite.add_reactants(additional_reactants);
588
589 let debug_struct_val = DebugStruct {
591 foo: 42,
592 bar: "test_add_reactants".to_owned(),
593 };
594 let uuid = Uuid::now_v7();
595 let payload =
596 Payload::with_correlation(debug_struct_val.clone(), neuron_arc.clone(), Some(uuid));
597
598 let _ = dendrite.transduce(payload.clone()).await;
600
601 assert_eq!(rx1.len(), 1);
603 let p1 = rx1.recv().await.unwrap();
604 assert_eq!(*p1.value, debug_struct_val);
605 assert_eq!(p1.correlation_id(), uuid);
606
607 assert_eq!(rx2.len(), 1);
608 let p2 = rx2.recv().await.unwrap();
609 assert_eq!(*p2.value, debug_struct_val);
610 assert_eq!(p2.correlation_id(), uuid);
611 }
612
613 #[tokio::test]
614 async fn test_dendrite_decoder_add_reactants() {
615 let ns = test_namespace();
616 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
617 let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
618 Arc::new(neuron_impl);
619
620 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
622 let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
623 vec![Arc::new(TokioMpscReactant { sender: tx1 })];
624
625 let (tx_raw1, mut rx_raw1) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
626 let initial_raw_reactants: Vec<
627 Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>,
628 > = vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw1 })];
629
630 let dendrite_decoder = DendriteDecoder::new(
632 neuron_arc.clone(),
633 initial_reactants,
634 initial_raw_reactants,
635 vec![],
636 None,
637 );
638
639 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
641 let additional_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
642 vec![Arc::new(TokioMpscReactant { sender: tx2 })];
643
644 let (tx_raw2, mut rx_raw2) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
645 let additional_raw_reactants: Vec<
646 Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>,
647 > = vec![Arc::new(TokioMpscReactantRaw { sender: tx_raw2 })];
648
649 let _ = dendrite_decoder.add_reactants(additional_reactants);
651 let _ = dendrite_decoder.add_raw_reactants(additional_raw_reactants);
652
653 let debug_struct_val = DebugStruct {
655 foo: 42,
656 bar: "test_add_reactants_decoder".to_owned(),
657 };
658 let uuid = Uuid::now_v7();
659 let encoded = neuron_arc
660 .encode(&debug_struct_val)
661 .expect("Encoding should succeed in test");
662 let payload_raw =
663 PayloadRaw::with_correlation(encoded.clone(), Some(neuron_arc.clone()), Some(uuid));
664
665 let _ = dendrite_decoder.transduce(payload_raw.clone()).await;
667
668 let p1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
670 .await
671 .expect("Timeout rx1")
672 .expect("Closed rx1");
673 assert_eq!(*p1.value, debug_struct_val);
674 assert_eq!(p1.correlation_id(), uuid);
675
676 let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
677 .await
678 .expect("Timeout rx2")
679 .expect("Closed rx2");
680 assert_eq!(*p2.value, debug_struct_val);
681 assert_eq!(p2.correlation_id(), uuid);
682
683 let p_raw1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw1.recv())
684 .await
685 .expect("Timeout raw_rx1")
686 .expect("Closed raw_rx1");
687 assert_eq!(p_raw1.value.as_slice(), encoded.as_slice());
688 assert_eq!(p_raw1.correlation_id(), uuid);
689
690 let p_raw2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw2.recv())
691 .await
692 .expect("Timeout raw_rx2")
693 .expect("Closed raw_rx2");
694 assert_eq!(p_raw2.value.as_slice(), encoded.as_slice());
695 assert_eq!(p_raw2.correlation_id(), uuid);
696 }
697
698 #[tokio::test]
699 async fn test_dendrite_concurrent_readers() {
700 let ns = test_namespace();
701 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
702 let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
703 Arc::new(neuron_impl);
704
705 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
707 let initial_reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
708 vec![Arc::new(TokioMpscReactant { sender: tx1 })];
709
710 let dendrite = Arc::new(Dendrite::new(neuron_arc.clone(), initial_reactants, vec![]));
712
713 let debug_struct_val = DebugStruct {
715 foo: 42,
716 bar: "test_concurrent_readers".to_owned(),
717 };
718 let uuid = Uuid::now_v7();
719 let payload =
720 Payload::with_correlation(debug_struct_val.clone(), neuron_arc.clone(), Some(uuid));
721
722 let num_threads = 5;
724 let mut handles = vec![];
725
726 for _ in 0..num_threads {
727 let dendrite_clone = dendrite.clone();
728 let payload_clone = payload.clone();
729
730 let handle = thread::spawn(move || {
731 let rt = tokio::runtime::Runtime::new().unwrap();
732 rt.block_on(async {
733 let _ = dendrite_clone.transduce(payload_clone).await;
734 });
735 });
736
737 handles.push(handle);
738 }
739
740 for handle in handles {
742 handle.join().unwrap();
743 }
744
745 for _ in 0..num_threads {
747 let p = rx1.recv().await.unwrap();
748 assert_eq!(*p.value, debug_struct_val);
749 assert_eq!(p.correlation_id(), uuid);
750 }
751 }
752}