Skip to main content

plexor_core/
synapse.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::backpressure::{BackpressureConfig, BackpressureQueue};
8use crate::codec::{Codec, CodecName};
9use crate::dendrite::{Dendrite, DendriteError};
10use crate::erasure::neuron::{NeuronErased, NeuronErasedWrapper};
11use crate::erasure::reactant::ReactantErased;
12use crate::logging::LogTrace;
13use crate::neuron::Neuron;
14use crate::payload::{Payload, PayloadRaw};
15use crate::reactant::{ErrorReactant, Reactant, ReactantRaw};
16use std::future::Future;
17use std::marker::PhantomData;
18use std::pin::Pin;
19use std::sync::Arc;
20use parking_lot::RwLock;
21use thiserror::Error;
22
23#[derive(Error, Debug)]
24pub enum SynapseError {
25    #[error("Queue for neuron '{neuron_name}' is full")]
26    QueueFull { neuron_name: String },
27    #[error(transparent)]
28    Dendrite(#[from] DendriteError),
29    #[error("Type conversion failed for neuron '{neuron_name}'")]
30    NeuronTypeConversion { neuron_name: String },
31    #[error("Type conversion failed for reactant in neuron '{neuron_name}'")]
32    ReactantTypeConversion { neuron_name: String },
33    #[error("No dendrite available for processing in neuron '{neuron_name}'")]
34    NoDendrite { neuron_name: String },
35}
36
37use tracing::Instrument;
38
39pub trait SynapseInternal<T, C>
40where
41    C: Codec<T> + CodecName + Send + Sync + 'static,
42    T: Send + Sync + 'static,
43{
44    fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync>;
45    fn transduce(
46        &self,
47        payload: Arc<Payload<T, C>>,
48    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
49    fn transmit(
50        &self,
51        payload: Arc<Payload<T, C>>,
52    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>>;
53    fn react(
54        &mut self,
55        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
56        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
57    ) -> Result<(), SynapseError>;
58}
59
60pub trait SynapseExternal<T, C>
61where
62    C: Codec<T> + CodecName + Send + Sync + 'static,
63    T: Send + Sync + 'static,
64{
65    fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync>;
66    fn transduce(
67        &self,
68        payload: Arc<PayloadRaw<T, C>>,
69    ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static;
70    fn transmit(
71        &self,
72        payload: Arc<PayloadRaw<T, C>>,
73    ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static;
74    fn react(
75        &mut self,
76        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
77        raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
78        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
79    ) -> Result<(), SynapseError>;
80}
81
82/// Trait for abstracting raw data transmission.
83pub trait RawSender: Send + Sync {
84    fn send(
85        &self,
86        topic: &str,
87        data: Vec<u8>,
88    ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
89}
90
91/// A wrapper for RawSender that applies backpressure buffering.
92pub struct BackpressureSender<S: RawSender> {
93    queue: Arc<BackpressureQueue<(String, Vec<u8>)>>,
94    _marker: PhantomData<S>,
95}
96
97impl<S: RawSender + 'static> BackpressureSender<S> {
98    pub fn new(inner: S, config: BackpressureConfig, neuron_name: String) -> Self {
99        let inner_arc = Arc::new(inner);
100        let inner_clone = inner_arc.clone();
101
102        let queue = BackpressureQueue::<(String, Vec<u8>)>::new(
103            neuron_name,
104            config,
105            move |(topic, data)| {
106                let s = inner_clone.clone();
107                async move {
108                    if let Err(e) = s.send(&topic, data).await {
109                        eprintln!("BackpressureSender failed to send: {}", e);
110                    }
111                }
112            },
113        );
114
115        Self {
116            queue: Arc::new(queue),
117            _marker: PhantomData,
118        }
119    }
120}
121
122impl<S: RawSender + 'static> RawSender for BackpressureSender<S> {
123    fn send(
124        &self,
125        topic: &str,
126        data: Vec<u8>,
127    ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> {
128        let q = self.queue.clone();
129        let topic = topic.to_string();
130        Box::pin(async move {
131            q.push((topic, data)).await.map_err(|e| e.to_string())
132        })
133    }
134}
135
136/// A wrapper for external synapses that adds backpressure management.
137pub struct BackpressureExternalSynapse<T, C, S>
138where
139    C: Codec<T> + CodecName + Send + Sync + 'static,
140    T: Send + Sync + 'static,
141    S: SynapseExternal<T, C> + Send + Sync + 'static,
142{
143    inner: Arc<RwLock<S>>,
144    neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
145    queue: Arc<BackpressureQueue<Arc<PayloadRaw<T, C>>>>,
146    _phantom: PhantomData<(T, C)>,
147}
148
149impl<T, C, S> BackpressureExternalSynapse<T, C, S>
150where
151    C: Codec<T> + CodecName + Send + Sync + 'static,
152    T: Send + Sync + 'static,
153    S: SynapseExternal<T, C> + Send + Sync + 'static,
154{
155    pub fn new(synapse: S, config: BackpressureConfig) -> Self {
156        let neuron = synapse.neuron();
157        let neuron_name = neuron.name();
158        let synapse_arc = Arc::new(RwLock::new(synapse));
159        let inner_clone = synapse_arc.clone();
160
161        let queue = BackpressureQueue::new(
162            neuron_name,
163            config,
164            move |payload: Arc<PayloadRaw<T, C>>| {
165                let s = inner_clone.clone();
166                async move {
167                    let future = {
168                        let guard = s.read();
169                        guard.transmit(payload)
170                    };
171                    let _ = future.await;
172                }
173            },
174        );
175
176        Self {
177            inner: synapse_arc,
178            neuron,
179            queue: Arc::new(queue),
180            _phantom: PhantomData,
181        }
182    }
183}
184
185impl<T, C, S> SynapseExternal<T, C> for BackpressureExternalSynapse<T, C, S>
186where
187    C: Codec<T> + CodecName + Send + Sync + 'static,
188    T: Send + Sync + 'static,
189    S: SynapseExternal<T, C> + Send + Sync + 'static,
190{
191    fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
192        self.neuron.clone()
193    }
194
195    fn transduce(
196        &self,
197        payload: Arc<PayloadRaw<T, C>>,
198    ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
199        self.transmit(payload)
200    }
201
202    fn transmit(
203        &self,
204        payload: Arc<PayloadRaw<T, C>>,
205    ) -> impl Future<Output = Result<(Vec<()>, Vec<()>), SynapseError>> + Send + 'static {
206        let q = self.queue.clone();
207        Box::pin(async move {
208            q.push(payload).await?;
209            Ok((vec![], vec![]))
210        })
211    }
212
213    fn react(
214        &mut self,
215        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
216        raw_reactants: Vec<Arc<dyn ReactantRaw<T, C> + Send + Sync>>,
217        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
218    ) -> Result<(), SynapseError> {
219        let mut guard = self.inner.write();
220        guard.react(reactants, raw_reactants, error_reactants)
221    }
222}
223
224pub struct SynapseInprocess<T, C>
225where
226    C: Codec<T> + CodecName + Send + Sync + 'static,
227    T: Sync + Send + 'static,
228{
229    neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
230    dendrite: Option<Dendrite<T, C>>,
231    _codec_marker: PhantomData<fn() -> &'static ()>,
232}
233
234impl<T, C> SynapseInprocess<T, C>
235where
236    C: Codec<T> + CodecName + Send + Sync + 'static,
237    T: Sync + Send + 'static,
238{
239    pub fn new(
240        neuron: Arc<dyn Neuron<T, C> + Send + Sync>,
241        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
242        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
243    ) -> Self {
244        let dendrite = if !reactants.is_empty() || !error_reactants.is_empty() {
245            Some(Dendrite::new(neuron.clone(), reactants, error_reactants))
246        } else {
247            None
248        };
249        Self {
250            neuron,
251            dendrite,
252            _codec_marker: PhantomData,
253        }
254    }
255
256    /// Create a new SynapseInprocess from type-erased neuron and reactants
257    /// Returns None if the types don't match
258    pub fn from_erased(
259        neuron: Arc<dyn NeuronErased + Send + Sync + 'static>,
260        reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>>,
261    ) -> Option<Self>
262    where
263        T: 'static,
264        C: 'static,
265    {
266        use std::any::TypeId;
267
268        // First, check if the neuron's type parameters match what we need
269        let neuron_type_id = neuron.payload_type_id();
270        let codec_type_id = neuron.codec_type_id();
271
272        if neuron_type_id != TypeId::of::<T>() || codec_type_id != TypeId::of::<C>() {
273            return None;
274        }
275
276        // Try to convert the neuron to the correct type safely using Any downcast
277        let typed_neuron = match neuron.as_any().downcast_ref::<NeuronErasedWrapper<T, C>>() {
278            Some(wrapper) => wrapper.get_typed_neuron(),
279            None => return None,
280        };
281
282        // Convert reactants
283        let typed_reactants: Vec<_> = reactants
284            .into_iter()
285            .filter_map(|erased_reactant| {
286                // Check if this reactant's type parameters match
287                if erased_reactant.payload_type_id() != TypeId::of::<T>()
288                    || erased_reactant.codec_type_id() != TypeId::of::<C>()
289                {
290                    return None;
291                }
292
293                // Try to convert to the correct type using safe Any downcast
294                let any_arc = erased_reactant.clone_to_any();
295                any_arc
296                    .downcast::<Arc<dyn Reactant<T, C> + Send + Sync + 'static>>()
297                    .ok()
298                    .map(|boxed_arc| (*boxed_arc).clone())
299            })
300            .collect();
301
302        // Create a new SynapseInprocess with the typed objects
303        Some(Self::new(typed_neuron.clone(), typed_reactants, vec![]))
304    }
305}
306
307impl<T, C> SynapseInternal<T, C> for SynapseInprocess<T, C>
308where
309    C: Codec<T> + CodecName + Send + Sync + 'static,
310    T: Sync + Send + 'static,
311{
312    fn neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync> {
313        self.neuron.clone()
314    }
315
316    fn transduce(
317        &self,
318        payload: Arc<Payload<T, C>>,
319    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
320        let span = payload.span_debug("SynapseInprocess::transduce");
321        match &self.dendrite {
322            Some(dendrite) => {
323                let future = dendrite.transduce(payload);
324                Box::pin(
325                    async move {
326                        tracing::debug!("SynapseInprocess::transduce calling dendrite.transduce");
327                        future.await.map_err(SynapseError::from)
328                    }
329                    .instrument(span),
330                )
331            }
332            None => Box::pin(
333                async move {
334                    tracing::debug!("SynapseInprocess::transduce no dendrite, returning empty vec");
335                    Ok(vec![])
336                }
337                .instrument(span),
338            ),
339        }
340    }
341
342    fn transmit(
343        &self,
344        payload: Arc<Payload<T, C>>,
345    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, SynapseError>> + Send + 'static>> {
346        let span = payload.span_debug("SynapseInprocess::transmit");
347        // This is an internal synapse so transmit directly to any reactants we have
348        let future = self.transduce(payload);
349        Box::pin(
350            async move {
351                tracing::debug!("SynapseInprocess::transmit called");
352                future.await
353            }
354            .instrument(span),
355        )
356    }
357
358    fn react(
359        &mut self,
360        reactants: Vec<Arc<dyn Reactant<T, C> + Send + Sync>>,
361        error_reactants: Vec<Arc<dyn ErrorReactant<T, C> + Send + Sync>>,
362    ) -> Result<(), SynapseError> {
363        if reactants.is_empty() && error_reactants.is_empty() {
364            return Ok(());
365        }
366
367        match &self.dendrite {
368            Some(dendrite) => {
369                // Add reactants to existing dendrite
370                dendrite.add_reactants(reactants)?;
371                dendrite.add_error_reactants(error_reactants)?;
372            }
373            None => {
374                // Create a new dendrite with the stored neuron and reactants
375                self.dendrite = Some(Dendrite::new(
376                    self.neuron.clone(),
377                    reactants,
378                    error_reactants,
379                ));
380            }
381        }
382        Ok(())
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use crate::logging::TraceContext;
390    use crate::neuron::NeuronImpl;
391    use crate::payload::PayloadRaw;
392    use crate::test_utils::{
393        DebugCodec, DebugStruct, SynapseExternalInprocess, TokioMpscReactant, TokioMpscReactantRaw,
394        test_namespace,
395    };
396    use tokio::sync::mpsc::channel;
397    use uuid::Uuid;
398
399    #[tokio::test]
400    async fn test_synapse_inprocess_transmit() {
401        let ns = test_namespace();
402
403        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
404
405        let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> = vec![
406            Arc::new(TokioMpscReactant { sender: tx.clone() }),
407            Arc::new(TokioMpscReactant { sender: tx.clone() }),
408        ];
409        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
410        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
411        let synapse = SynapseInprocess::new(neuron.clone(), reactants, vec![]);
412
413        let debug_struct = Arc::new(DebugStruct {
414            foo: 42,
415            bar: "test_value".to_owned(),
416        });
417        let correlation_id = Uuid::now_v7();
418        let span_id = Uuid::now_v7().as_u128() as u64;
419        let _ = synapse
420            .transmit(
421                Payload::builder()
422                    .value((*debug_struct).clone())
423                    .correlation_id(correlation_id)
424                    .neuron(neuron.clone())
425                    .span_id(span_id)
426                    .build()
427                    .unwrap(),
428            )
429            .await;
430
431        assert_eq!(rx.len(), 2);
432
433        let p = rx.recv().await.unwrap();
434        assert_eq!(p.value, debug_struct);
435        assert_eq!(p.correlation_id(), correlation_id);
436        assert_eq!(p.span_id(), span_id);
437        assert_eq!(rx.len(), 1);
438        let p2 = rx.recv().await.unwrap();
439        assert_eq!(p2.value, debug_struct);
440        assert_eq!(p2.correlation_id(), correlation_id);
441        assert_eq!(p2.span_id(), span_id);
442        assert_eq!(rx.len(), 0);
443
444        let debug_struct_2 = Arc::new(DebugStruct {
445            foo: 49,
446            bar: "foo_bar".to_owned(),
447        });
448        let correlation_id_2 = Uuid::now_v7();
449        let span_id_2 = Uuid::now_v7().as_u128() as u64;
450        let _ = synapse
451            .transmit(
452                Payload::builder()
453                    .value((*debug_struct_2).clone())
454                    .correlation_id(correlation_id_2)
455                    .neuron(neuron.clone())
456                    .span_id(span_id_2)
457                    .build()
458                    .unwrap(),
459            )
460            .await;
461
462        let p3 = rx.recv().await.unwrap();
463        assert_eq!(p3.value, debug_struct_2);
464        assert_eq!(p3.correlation_id(), correlation_id_2);
465        assert_eq!(p3.span_id(), span_id_2);
466        assert_eq!(rx.len(), 1);
467        let p4 = rx.recv().await.unwrap();
468        assert_eq!(p4.value, debug_struct_2);
469        assert_eq!(p4.correlation_id(), correlation_id_2);
470        assert_eq!(p4.span_id(), span_id_2);
471        assert_eq!(rx.len(), 0);
472    }
473
474    #[tokio::test]
475    async fn test_synapse_inprocess_with_none_reactants() {
476        let ns = test_namespace();
477        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
478        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
479
480        // Create synapse with None reactants
481        let synapse = SynapseInprocess::new(neuron.clone(), vec![], vec![]);
482
483        let debug_struct = Arc::new(DebugStruct {
484            foo: 42,
485            bar: "test_value".to_owned(),
486        });
487        let correlation_id = Uuid::now_v7();
488        let span_id = Uuid::now_v7().as_u128() as u64;
489
490        // This should return empty vector since dendrite is None
491        let result = synapse
492            .transmit(
493                Payload::builder()
494                    .value((*debug_struct).clone())
495                    .correlation_id(correlation_id)
496                    .neuron(neuron.clone())
497                    .span_id(span_id)
498                    .build()
499                    .unwrap(),
500            )
501            .await;
502
503        assert_eq!(
504            result.expect("Should succeed").len(),
505            0,
506            "Should return empty vector when dendrite is None"
507        );
508    }
509
510    #[tokio::test]
511    async fn test_synapse_external_with_none_reactants() {
512        let ns = test_namespace();
513        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
514        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
515            Arc::new(neuron_impl.clone());
516
517        // Create synapse with None reactants and raw_reactants
518        let synapse = SynapseExternalInprocess::new(neuron.clone(), vec![], vec![], vec![]);
519
520        let debug_struct_value = DebugStruct {
521            foo: 42,
522            bar: "test_value".to_owned(),
523        };
524        let debug_struct_arc = Arc::new(debug_struct_value);
525        let correlation_id = Uuid::now_v7();
526        let direct_neuron_encoder: NeuronImpl<DebugStruct, DebugCodec> =
527            NeuronImpl::new(ns.clone());
528        let encoded = direct_neuron_encoder
529            .encode(debug_struct_arc.as_ref())
530            .expect("Failed to encode");
531        let span_id = Uuid::now_v7().as_u128() as u64;
532
533        // This should return empty vectors since dendrite_decoder is None
534        let result = synapse
535            .transmit(Arc::new(PayloadRaw {
536                value: Arc::new(encoded.clone()),
537                neuron: neuron.clone(),
538                trace: TraceContext::from_parts(correlation_id, span_id, None),
539            }))
540            .await
541            .expect("Should succeed");
542
543        assert_eq!(
544            result.0.len(),
545            0,
546            "Should return empty vector for reactants when dendrite_decoder is None"
547        );
548        assert_eq!(
549            result.1.len(),
550            0,
551            "Should return empty vector for raw_reactants when dendrite_decoder is None"
552        );
553    }
554
555    #[tokio::test]
556    async fn test_synapse_external_inprocess_transmit() {
557        let ns = test_namespace();
558
559        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
560        let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(1);
561
562        let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
563            vec![Arc::new(TokioMpscReactant { sender: tx.clone() })];
564        let raw_reactants: Vec<Arc<dyn ReactantRaw<DebugStruct, DebugCodec> + Send + Sync>> =
565            vec![Arc::new(TokioMpscReactantRaw {
566                sender: tx_raw.clone(),
567            })];
568        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
569        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
570            Arc::new(neuron_impl.clone());
571        let synapse =
572            SynapseExternalInprocess::new(neuron.clone(), reactants, raw_reactants, vec![]);
573
574        let debug_struct_value = DebugStruct {
575            foo: 42,
576            bar: "test_value".to_owned(),
577        };
578        let debug_struct_arc = Arc::new(debug_struct_value);
579
580        let correlation_id = Uuid::now_v7();
581        let direct_neuron_encoder: NeuronImpl<DebugStruct, DebugCodec> =
582            NeuronImpl::new(ns.clone());
583        let encoded = direct_neuron_encoder
584            .encode(debug_struct_arc.as_ref())
585            .expect("Failed to encode");
586
587        let span_id = Uuid::now_v7().as_u128() as u64;
588
589        let _ = synapse
590            .transmit(Arc::new(PayloadRaw {
591                value: Arc::new(encoded.clone()),
592                neuron: neuron.clone(),
593                trace: TraceContext::from_parts(correlation_id, span_id, None),
594            }))
595            .await;
596
597        let p = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
598            .await
599            .expect("Timeout rx1")
600            .expect("Closed rx1");
601        assert_eq!(p.value, debug_struct_arc);
602        assert_eq!(p.correlation_id(), correlation_id);
603        assert_eq!(p.span_id(), span_id);
604
605        let p_raw = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
606            .await
607            .expect("Timeout raw_rx1")
608            .expect("Closed raw_rx1");
609        assert_eq!(p_raw.value.as_slice(), encoded.as_slice());
610        assert_eq!(p_raw.correlation_id(), correlation_id);
611        assert_eq!(p_raw.span_id(), span_id);
612
613        let debug_struct_2_value = DebugStruct {
614            foo: 49,
615            bar: "foo_bar".to_owned(),
616        };
617        let debug_struct_2_arc = Arc::new(debug_struct_2_value);
618        let correlation_id_2 = Uuid::now_v7();
619        let encoded_2 = direct_neuron_encoder
620            .encode(debug_struct_2_arc.as_ref())
621            .expect("Failed to encode");
622        let span_id_2 = Uuid::now_v7().as_u128() as u64;
623        let _ = synapse
624            .transmit(Arc::new(PayloadRaw {
625                value: Arc::new(encoded_2.clone()),
626                neuron: neuron.clone(),
627                trace: TraceContext::from_parts(correlation_id_2, span_id_2, None),
628            }))
629            .await;
630
631        let p2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
632            .await
633            .expect("Timeout rx2")
634            .expect("Closed rx2");
635        assert_eq!(p2.value, debug_struct_2_arc);
636        assert_eq!(p2.correlation_id(), correlation_id_2);
637        assert_eq!(p2.span_id(), span_id_2);
638
639        let p_raw2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx_raw.recv())
640            .await
641            .expect("Timeout raw_rx2")
642            .expect("Closed raw_rx2");
643        assert_eq!(p_raw2.value.as_slice(), encoded_2.as_slice());
644        assert_eq!(p_raw2.correlation_id(), correlation_id_2);
645        assert_eq!(p_raw2.span_id(), span_id_2);
646    }
647}