1use crate::codec::{Codec, CodecName};
8use crate::dendrite::{Dendrite, DendriteError};
9use crate::erasure::neuron::{NeuronErased, NeuronErasedWrapper};
10use crate::erasure::reactant::ReactantErased;
11use crate::logging::LogTrace;
12use crate::neuron::Neuron;
13use crate::payload::{Payload, PayloadRaw};
14use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
15use std::collections::VecDeque;
16use std::future::Future;
17use std::marker::PhantomData;
18use std::pin::Pin;
19use std::sync::{Arc, Mutex as StdMutex, RwLock};
20use thiserror::Error;
21
22#[derive(Error, Debug)]
23pub enum SynapseError {
24 #[error("Queue for neuron '{neuron_name}' is full")]
25 QueueFull { neuron_name: String },
26 #[error(transparent)]
27 Dendrite(#[from] DendriteError),
28 #[error("Type conversion failed for neuron '{neuron_name}'")]
29 NeuronTypeConversion { neuron_name: String },
30 #[error("Type conversion failed for reactant in neuron '{neuron_name}'")]
31 ReactantTypeConversion { neuron_name: String },
32 #[error("No dendrite available for processing in neuron '{neuron_name}'")]
33 NoDendrite { neuron_name: String },
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
37pub enum BackpressureStrategy {
38 Block,
39 DropOldest,
40 DropNewest,
41 Reject,
42}
43
44#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
45pub struct BackpressureConfig {
46 pub queue_size: usize,
47 pub strategy: BackpressureStrategy,
48}
49
50impl Default for BackpressureConfig {
51 fn default() -> Self {
52 Self {
53 queue_size: 1000,
54 strategy: BackpressureStrategy::Block,
55 }
56 }
57}
58
59use tokio::sync::Notify;
60use tracing::Instrument;
61
62pub struct BackpressureQueue<T: Send + 'static> {
64 inner: Arc<BackpressureQueueInner<T>>,
65}
66
67struct BackpressureQueueInner<T: Send + 'static> {
68 queue: StdMutex<VecDeque<T>>,
69
70 config: BackpressureConfig,
71
72 neuron_name: String,
73
74 item_added: Notify,
76
77 item_removed: Notify,
79}
80
81impl<T: Send + 'static> BackpressureQueue<T> {
82 pub fn new<F, Fut>(neuron_name: String, config: BackpressureConfig, mut processor: F) -> Self
83 where
84 F: FnMut(T) -> Fut + Send + 'static,
85 Fut: Future<Output = ()> + Send + 'static,
86 {
87 let inner = Arc::new(BackpressureQueueInner {
88 queue: StdMutex::new(VecDeque::with_capacity(config.queue_size)),
89
90 config,
91
92 neuron_name: neuron_name.clone(),
93
94 item_added: Notify::new(),
95
96 item_removed: Notify::new(),
97 });
98
99 let worker_inner = inner.clone();
100
101 tokio::spawn(async move {
102 loop {
103 let item = {
104 let mut queue = worker_inner.queue.lock().unwrap();
105
106 if let Some(item) = queue.pop_front() {
107 worker_inner.item_removed.notify_waiters();
108
109 Some(item)
110 } else {
111 None
112 }
113 };
114
115 if let Some(item) = item {
116 processor(item).await;
117 } else {
118 worker_inner.item_added.notified().await;
121 }
122 }
123 });
124
125 Self { inner }
126 }
127
128 pub async fn push(&self, item: T) -> Result<(), SynapseError> {
129 loop {
130 let should_wait = {
132 let mut queue = self.inner.queue.lock().unwrap();
133
134 if queue.len() < self.inner.config.queue_size {
135 queue.push_back(item);
136 self.inner.item_added.notify_one();
137 return Ok(());
138 }
139
140 match self.inner.config.strategy {
141 BackpressureStrategy::Block => true,
142 BackpressureStrategy::DropOldest => {
143 queue.pop_front();
144 queue.push_back(item);
145 self.inner.item_added.notify_one();
146 tracing::warn!(
147 neuron = %self.inner.neuron_name,
148 "Backpressure: Dropped oldest message (queue full)"
149 );
150 return Ok(());
151 }
152 BackpressureStrategy::DropNewest => {
153 tracing::warn!(
154 neuron = %self.inner.neuron_name,
155 "Backpressure: Dropped newest message (queue full)"
156 );
157 return Ok(()); }
159 BackpressureStrategy::Reject => {
160 tracing::warn!(
161 neuron = %self.inner.neuron_name,
162 "Backpressure: Rejected message (queue full)"
163 );
164 return Err(SynapseError::QueueFull {
165 neuron_name: self.inner.neuron_name.clone(),
166 });
167 }
168 }
169 };
170
171 if should_wait {
172 self.inner.item_removed.notified().await;
173 }
174 }
175 }
176}
177
178pub trait SynapseInternal<T, C>
179where
180 C: Codec<T> + CodecName + Send + Sync + 'static,
181 T: Send + Sync + 'static,
182{
183 fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync>;
184 fn transduce(
185 &self,
186 payload: Arc<Payload<T, C>>,
187 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
188 fn transmit(
189 &self,
190 payload: Arc<Payload<T, C>>,
191 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
192 fn react(
193 &mut self,
194 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
195 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
196 ) -> Result<(), SynapseError>;
197}
198
199pub trait SynapseExternal<T, C>
200where
201 C: Codec<T> + CodecName + Send + Sync + 'static,
202 T: Send + Sync + 'static,
203{
204 fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync>;
205 fn transduce(
206 &self,
207 payload: Arc<PayloadRaw<T, C>>,
208 ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static;
209 fn transmit(
210 &self,
211 payload: Arc<PayloadRaw<T, C>>,
212 ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static;
213 fn react(
214 &mut self,
215 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
216 raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
217 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
218 ) -> Result<(), SynapseError>;
219}
220
221pub trait RawSender: Send + Sync {
223 fn send(
224 &self,
225 topic: &str,
226 data: Vec<u8>,
227 ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
228}
229
230pub struct BackpressureSender<S: RawSender> {
232 queue: Arc<BackpressureQueue<(String, Vec<u8>)>>,
233 _marker: PhantomData<S>,
234}
235
236impl<S: RawSender + 'static> BackpressureSender<S> {
237 pub fn new(inner: S, config: BackpressureConfig, neuron_name: String) -> Self {
238 let inner_arc = Arc::new(inner);
239 let inner_clone = inner_arc.clone();
240
241 let queue = BackpressureQueue::<(String, Vec<u8>)>::new(
242 neuron_name,
243 config,
244 move |(topic, data)| {
245 let s = inner_clone.clone();
246 async move {
247 if let Err(e) = s.send(&topic, data).await {
248 eprintln!("BackpressureSender failed to send: {}", e);
249 }
250 }
251 },
252 );
253
254 Self {
255 queue: Arc::new(queue),
256 _marker: PhantomData,
257 }
258 }
259}
260
261impl<S: RawSender + 'static> RawSender for BackpressureSender<S> {
262 fn send(
263 &self,
264 topic: &str,
265 data: Vec<u8>,
266 ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> {
267 let q = self.queue.clone();
268 let topic = topic.to_string();
269 Box::pin(async move {
270 q.push((topic, data)).await.map_err(|e| e.to_string())
271 })
272 }
273}
274
275pub struct BackpressureExternalSynapse<T, C, S>
277where
278 C: Codec<T> + CodecName + Send + Sync + 'static,
279 T: Send + Sync + 'static,
280 S: SynapseExternal<T, C> + Send + Sync + 'static,
281{
282 inner: Arc<RwLock<S>>,
283 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
284 queue: Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>>,
285 _phantom: PhantomData<(T, C)>,
286}
287
288impl<T, C, S> BackpressureExternalSynapse<T, C, S>
289where
290 C: Codec<T> + CodecName + Send + Sync + 'static,
291 T: Send + Sync + 'static,
292 S: SynapseExternal<T, C> + Send + Sync + 'static,
293{
294 pub fn new(synapse: S, config: BackpressureConfig) -> Self {
295 let neuron = synapse.neuron();
296 let neuron_name = neuron.name();
297 let synapse_arc = Arc::new(RwLock::new(synapse));
298 let inner_clone = synapse_arc.clone();
299
300 let queue = BackpressureQueue::new(
301 neuron_name,
302 config,
303 move |payload: Arc<PayloadRaw<T, C>>| {
304 let s = inner_clone.clone();
305 async move {
306 let future = {
307 let guard = s.read().unwrap();
308 guard.transmit(payload)
309 };
310 let _ = future.await;
311 }
312 },
313 );
314
315 Self {
316 inner: synapse_arc,
317 neuron,
318 queue: Arc::new(queue),
319 _phantom: PhantomData,
320 }
321 }
322}
323
324impl<T, C, S> SynapseExternal<T, C> for BackpressureExternalSynapse<T, C, S>
325where
326 C: Codec<T> + CodecName + Send + Sync + 'static,
327 T: Send + Sync + 'static,
328 S: SynapseExternal<T, C> + Send + Sync + 'static,
329{
330 fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
331 self.neuron.clone()
332 }
333
334 fn transduce(
335 &self,
336 payload: Arc<PayloadRaw<T, C>>,
337 ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
338 self.transmit(payload)
339 }
340
341 fn transmit(
342 &self,
343 payload: Arc<PayloadRaw<T, C>>,
344 ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
345 let q = self.queue.clone();
346 Box::pin(async move {
347 q.push(payload).await?;
348 Ok((vec![], vec![]))
349 })
350 }
351
352 fn react(
353 &mut self,
354 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
355 raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
356 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
357 ) -> Result<(), SynapseError> {
358 let mut guard = self.inner.write().unwrap();
359 guard.react(reactants, raw_reactants, error_reactants)
360 }
361}
362
363pub struct SynapseInprocess<T, C>
364where
365 C: Codec<T> + CodecName + Send + Sync + 'static,
366 T: Sync + Send + 'static,
367{
368 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
369 dendrite: Option<Dendrite<T, C>>,
370 _codec_marker: PhantomData<fn() -> &'static ()>,
371}
372
373impl<T, C> SynapseInprocess<T, C>
374where
375 C: Codec<T> + CodecName + Send + Sync + 'static,
376 T: Sync + Send + 'static,
377{
378 pub fn new(
379 neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
380 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
381 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
382 ) -> Self {
383 let dendrite = if !reactants.is_empty() || !error_reactants.is_empty() {
384 Some(Dendrite::new(neuron.clone(), reactants, error_reactants))
385 } else {
386 None
387 };
388 Self {
389 neuron,
390 dendrite,
391 _codec_marker: PhantomData,
392 }
393 }
394
395 pub fn from_erased(
398 neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
399 reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
400 ) -> Option<Self>
401 where
402 T: 'static,
403 C: 'static,
404 {
405 use std::any::TypeId;
406
407 let neuron_type_id = neuron.payload_type_id();
409 let codec_type_id = neuron.codec_type_id();
410
411 if neuron_type_id != TypeId::of::<T>() || codec_type_id != TypeId::of::<C>() {
412 return None;
413 }
414
415 let typed_neuron = match neuron.as_any().downcast_ref::<NeuronErasedWrapper<T, C>>() {
417 Some(wrapper) => wrapper.get_typed_neuron(),
418 None => return None,
419 };
420
421 let typed_reactants: Vec<_> = reactants
423 .into_iter()
424 .filter_map(|erased_reactant| {
425 if erased_reactant.payload_type_id() != TypeId::of::<T>()
427 || erased_reactant.codec_type_id() != TypeId::of::<C>()
428 {
429 return None;
430 }
431
432 let any_arc = erased_reactant.clone_to_any();
434 any_arc
435 .downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
436 .ok()
437 .map(|boxed_arc| (*boxed_arc).clone())
438 })
439 .collect();
440
441 Some(Self::new(typed_neuron.clone(), typed_reactants, vec![]))
443 }
444}
445
446impl<T, C> SynapseInternal<T, C> for SynapseInprocess<T, C>
447where
448 C: Codec<T> + CodecName + Send + Sync + 'static,
449 T: Sync + Send + 'static,
450{
451 fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
452 self.neuron.clone()
453 }
454
455 fn transduce(
456 &self,
457 payload: Arc<Payload<T, C>>,
458 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
459 let span = payload.span_debug("SynapseInprocess::transduce");
460 let neuron_name = self.neuron.name();
461 match &self.dendrite {
462 Some(dendrite) => {
463 let future = dendrite.transduce(payload);
464 Box::pin(
465 async move {
466 tracing::debug!(neuron = %neuron_name, "SynapseInprocess::transduce calling dendrite.transduce");
467 future.await.map_err(SynapseError::from)
468 }
469 .instrument(span),
470 )
471 }
472 None => Box::pin(
473 async move {
474 tracing::debug!(
475 neuron = %neuron_name,
476 "SynapseInprocess::transduce no dendrite, returning empty vec"
477 );
478 Ok(vec![])
479 }
480 .instrument(span),
481 ),
482 }
483 }
484
485 fn transmit(
486 &self,
487 payload: Arc<Payload<T, C>>,
488 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
489 let span = payload.span_debug("SynapseInprocess::transmit");
490 let future = self.transduce(payload);
492 Box::pin(
493 async move {
494 tracing::debug!("SynapseInprocess::transmit called");
495 future.await
496 }
497 .instrument(span),
498 )
499 }
500
501 fn react(
502 &mut self,
503 reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
504 error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
505 ) -> Result<(), SynapseError> {
506 if reactants.is_empty() && error_reactants.is_empty() {
507 return Ok(());
508 }
509
510 match &self.dendrite {
511 Some(dendrite) => {
512 dendrite.add_reactants(reactants)?;
514 dendrite.add_error_reactants(error_reactants)?;
515 }
516 None => {
517 self.dendrite = Some(Dendrite::new(
519 self.neuron.clone(),
520 reactants,
521 error_reactants,
522 ));
523 }
524 }
525 Ok(())
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532 use crate::logging::TraceContext;
533 use crate::neuron::NeuronImpl;
534 use crate::payload::PayloadRaw;
535 use crate::test_utils::{
536 DebugCodec, DebugStruct, SynapseExternalInprocess, TokioMpscReactant, TokioMpscReactantRaw,
537 test_namespace,
538 };
539 use tokio::sync::mpsc::channel;
540 use uuid::Uuid;
541
542 #[tokio::test]
543 async fn test_synapse_inprocess_transmit() {
544 let ns = test_namespace();
545
546 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
547
548 let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> = vec![
549 Arc::new(TokioMpscReactant { sender: tx.clone() }),
550 Arc::new(TokioMpscReactant { sender: tx.clone() }),
551 ];
552 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
553 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
554 let synapse = SynapseInprocess::new(neuron.clone(), reactants, vec![]);
555
556 let debug_struct = Arc::new(DebugStruct {
557 foo: 42,
558 bar: "test_value".to_owned(),
559 });
560 let correlation_id = Uuid::now_v7();
561 let span_id = Uuid::now_v7().as_u128() as u64;
562 let _ = synapse
563 .transmit(
564 Payload::builder()
565 .value((*debug_struct).clone())
566 .correlation_id(correlation_id)
567 .neuron(neuron.clone())
568 .span_id(span_id)
569 .build()
570 .unwrap(),
571 )
572 .await;
573
574 assert_eq!(rx.len(), 2);
575
576 let p = rx.recv().await.unwrap();
577 assert_eq!(p.value, debug_struct);
578 assert_eq!(p.correlation_id(), correlation_id);
579 assert_eq!(p.span_id(), span_id);
580 assert_eq!(rx.len(), 1);
581 let p2 = rx.recv().await.unwrap();
582 assert_eq!(p2.value, debug_struct);
583 assert_eq!(p2.correlation_id(), correlation_id);
584 assert_eq!(p2.span_id(), span_id);
585 assert_eq!(rx.len(), 0);
586
587 let debug_struct_2 = Arc::new(DebugStruct {
588 foo: 49,
589 bar: "foo_bar".to_owned(),
590 });
591 let correlation_id_2 = Uuid::now_v7();
592 let span_id_2 = Uuid::now_v7().as_u128() as u64;
593 let _ = synapse
594 .transmit(
595 Payload::builder()
596 .value((*debug_struct_2).clone())
597 .correlation_id(correlation_id_2)
598 .neuron(neuron.clone())
599 .span_id(span_id_2)
600 .build()
601 .unwrap(),
602 )
603 .await;
604
605 let p3 = rx.recv().await.unwrap();
606 assert_eq!(p3.value, debug_struct_2);
607 assert_eq!(p3.correlation_id(), correlation_id_2);
608 assert_eq!(p3.span_id(), span_id_2);
609 assert_eq!(rx.len(), 1);
610 let p4 = rx.recv().await.unwrap();
611 assert_eq!(p4.value, debug_struct_2);
612 assert_eq!(p4.correlation_id(), correlation_id_2);
613 assert_eq!(p4.span_id(), span_id_2);
614 assert_eq!(rx.len(), 0);
615 }
616
617 #[tokio::test]
618 async fn test_synapse_inprocess_with_none_reactants() {
619 let ns = test_namespace();
620 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
621 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
622
623 let synapse = SynapseInprocess::new(neuron.clone(), vec![], vec![]);
625
626 let debug_struct = Arc::new(DebugStruct {
627 foo: 42,
628 bar: "test_value".to_owned(),
629 });
630 let correlation_id = Uuid::now_v7();
631 let span_id = Uuid::now_v7().as_u128() as u64;
632
633 let result = synapse
635 .transmit(
636 Payload::builder()
637 .value((*debug_struct).clone())
638 .correlation_id(correlation_id)
639 .neuron(neuron.clone())
640 .span_id(span_id)
641 .build()
642 .unwrap(),
643 )
644 .await;
645
646 assert_eq!(
647 result.expect("Should succeed").len(),
648 0,
649 "Should return empty vector when dendrite is None"
650 );
651 }
652
653 #[tokio::test]
654 async fn test_synapse_external_with_none_reactants() {
655 let ns = test_namespace();
656 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
657 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
658 Arc::new(neuron_impl.clone());
659
660 let synapse = SynapseExternalInprocess::new(neuron.clone(), vec![], vec![], vec![]);
662
663 let debug_struct_value = DebugStruct {
664 foo: 42,
665 bar: "test_value".to_owned(),
666 };
667 let debug_struct_arc = Arc::new(debug_struct_value);
668 let correlation_id = Uuid::now_v7();
669 let direct_neuron_encoder: NeuronImpl<DebugStruct, DebugCodec> =
670 NeuronImpl::new(ns.clone());
671 let encoded = direct_neuron_encoder
672 .encode(debug_struct_arc.as_ref())
673 .expect("Failed to encode");
674 let span_id = Uuid::now_v7().as_u128() as u64;
675
676 let result = synapse
678 .transmit(Arc::new(PayloadRaw {
679 value: Arc::new(encoded.clone()),
680 neuron: Some(neuron.clone()),
681 trace: TraceContext::from_parts(correlation_id, span_id, None),
682 }))
683 .await
684 .expect("Should succeed");
685
686 assert_eq!(
687 result.0.len(),
688 0,
689 "Should return empty vector for reactants when dendrite_decoder is None"
690 );
691 assert_eq!(
692 result.1.len(),
693 0,
694 "Should return empty vector for raw_reactants when dendrite_decoder is None"
695 );
696 }
697
698 #[tokio::test]
699 async fn test_synapse_external_inprocess_transmit() {
700 let ns = test_namespace();
701
702 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
703 let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
704
705 let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
706 vec![Arc::new(TokioMpscReactant { sender: tx.clone() })];
707 let raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
708 vec![Arc::new(TokioMpscReactantRaw {
709 sender: tx_raw.clone(),
710 })];
711 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
712 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
713 Arc::new(neuron_impl.clone());
714 let synapse =
715 SynapseExternalInprocess::new(neuron.clone(), reactants, raw_reactants, vec![]);
716
717 let debug_struct_value = DebugStruct {
718 foo: 42,
719 bar: "test_value".to_owned(),
720 };
721 let debug_struct_arc = Arc::new(debug_struct_value);
722
723 let correlation_id = Uuid::now_v7();
724 let direct_neuron_encoder: NeuronImpl<DebugStruct, DebugCodec> =
725 NeuronImpl::new(ns.clone());
726 let encoded = direct_neuron_encoder
727 .encode(debug_struct_arc.as_ref())
728 .expect("Failed to encode");
729
730 let span_id = Uuid::now_v7().as_u128() as u64;
731
732 let _ = synapse
733 .transmit(Arc::new(PayloadRaw {
734 value: Arc::new(encoded.clone()),
735 neuron: Some(neuron.clone()),
736 trace: TraceContext::from_parts(correlation_id, span_id, None),
737 }))
738 .await;
739
740 let p = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
741 .await
742 .expect("Timeout rx1")
743 .expect("Closed rx1");
744 assert_eq!(p.value, debug_struct_arc);
745 assert_eq!(p.correlation_id(), correlation_id);
746 assert_eq!(p.span_id(), span_id);
747
748 let p_raw = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
749 .await
750 .expect("Timeout raw_rx1")
751 .expect("Closed raw_rx1");
752 assert_eq!(p_raw.value.as_slice(), encoded.as_slice());
753 assert_eq!(p_raw.correlation_id(), correlation_id);
754 assert_eq!(p_raw.span_id(), span_id);
755
756 let debug_struct_2_value = DebugStruct {
757 foo: 49,
758 bar: "foo_bar".to_owned(),
759 };
760 let debug_struct_2_arc = Arc::new(debug_struct_2_value);
761 let correlation_id_2 = Uuid::now_v7();
762 let encoded_2 = direct_neuron_encoder
763 .encode(debug_struct_2_arc.as_ref())
764 .expect("Failed to encode");
765 let span_id_2 = Uuid::now_v7().as_u128() as u64;
766 let _ = synapse
767 .transmit(Arc::new(PayloadRaw {
768 value: Arc::new(encoded_2.clone()),
769 neuron: Some(neuron.clone()),
770 trace: TraceContext::from_parts(correlation_id_2, span_id_2, None),
771 }))
772 .await;
773
774 let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
775 .await
776 .expect("Timeout rx2")
777 .expect("Closed rx2");
778 assert_eq!(p2.value, debug_struct_2_arc);
779 assert_eq!(p2.correlation_id(), correlation_id_2);
780 assert_eq!(p2.span_id(), span_id_2);
781
782 let p_raw2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
783 .await
784 .expect("Timeout raw_rx2")
785 .expect("Closed raw_rx2");
786 assert_eq!(p_raw2.value.as_slice(), encoded_2.as_slice());
787 assert_eq!(p_raw2.correlation_id(), correlation_id_2);
788 assert_eq!(p_raw2.span_id(), span_id_2);
789 }
790}