Skip to main content

plexor_core/
axon.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! Axon module for handling neuron-ganglion interactions with type erasure.
8//!
9//! This module provides the Axon trait and AxonImpl struct that manage the interface
10//! between neurons and ganglia, handling type erasure and providing a clean API
11//! for neuron operations.
12
13use crate::codec::{Codec, CodecName};
14use crate::erasure::payload::erase_payload;
15use crate::erasure::reactant::ReactantErased;
16use crate::ganglion::{GanglionError, GanglionInternal};
17use crate::logging::TraceContext;
18use crate::neuron::Neuron;
19use crate::payload::Payload;
20use crate::reactant::Reactant;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::time::Duration;
25use thiserror::Error;
26use tokio::sync::Mutex;
27use tokio::time::timeout;
28use tracing::Instrument;
29use uuid::Uuid;
30
31/// Error types for Axon operations
32#[derive(Error, Debug)]
33pub enum AxonError {
34    #[error(
35        "Neuron needs to be adapted to ganglion {ganglion_name} (id: {ganglion_id}): {neuron_name}"
36    )]
37    NeuronNotAdapted {
38        neuron_name: String,
39        ganglion_name: String,
40        ganglion_id: Uuid,
41    },
42    #[error(
43        "Failed to acquire lock on synapse: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
44    )]
45    SynapseLock {
46        neuron_name: String,
47        ganglion_name: String,
48        ganglion_id: Uuid,
49    },
50    #[error(
51        "Transmission error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id}): {message}"
52    )]
53    Transmit {
54        neuron_name: String,
55        ganglion_name: String,
56        ganglion_id: Uuid,
57        message: String,
58    },
59    #[error("Encode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
60    Encode {
61        neuron_name: String,
62        ganglion_name: String,
63        ganglion_id: Uuid,
64    },
65    #[error("Decode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
66    Decode {
67        neuron_name: String,
68        ganglion_name: String,
69        ganglion_id: Uuid,
70    },
71    #[error("Transmission timeout")]
72    TransmissionTimeout,
73    #[error("Ganglion error: {0}")]
74    GanglionError(#[from] GanglionError),
75}
76
77impl AxonError {
78    /// Convert a GanglionError to AxonError, translating SynapseNotFound appropriately
79    pub fn from_ganglion_error(error: GanglionError) -> Self {
80        match error {
81            GanglionError::SynapseNotFound {
82                neuron_name,
83                ganglion_name,
84                ganglion_id,
85            } => AxonError::NeuronNotAdapted {
86                neuron_name,
87                ganglion_name,
88                ganglion_id,
89            },
90            GanglionError::SynapseLock {
91                neuron_name,
92                ganglion_name,
93                ganglion_id,
94            } => AxonError::SynapseLock {
95                neuron_name,
96                ganglion_name,
97                ganglion_id,
98            },
99            GanglionError::Transmit {
100                neuron_name,
101                ganglion_name,
102                ganglion_id,
103                message,
104            } => AxonError::Transmit {
105                neuron_name,
106                ganglion_name,
107                ganglion_id,
108                message,
109            },
110            GanglionError::Encode {
111                neuron_name,
112                ganglion_name,
113                ganglion_id,
114            } => AxonError::Encode {
115                neuron_name,
116                ganglion_name,
117                ganglion_id,
118            },
119            GanglionError::Decode {
120                neuron_name,
121                ganglion_name,
122                ganglion_id,
123            } => AxonError::Decode {
124                neuron_name,
125                ganglion_name,
126                ganglion_id,
127            },
128            GanglionError::Adapt { .. } | GanglionError::QueueFull { .. } => {
129                // For adaptation and queue full errors, use the generic GanglionError wrapper
130                AxonError::GanglionError(error)
131            }
132        }
133    }
134}
135
136/// Trait for axon operations with typed neurons and data
137pub trait Axon<T, C>: Send + Sync
138where
139    T: Send + Sync + 'static,
140    C: Codec<T> + CodecName + Send + Sync + 'static,
141{
142        /// React to incoming reactants using the neuron
143        fn react(
144            &mut self,
145            reactants: Vec<Box<dyn Reactant<T, C> + Send + Sync + 'static>>,
146        ) -> Pin<Box<dyn Future<Output = Result<(), AxonError>> + Send + '_>>;
147     
148        /// Transmit data through the neuron and ganglion
149        ///
150        /// # Examples
151        ///
152        /// ```rust
153        /// # use std::sync::Arc;
154        /// # use tokio::sync::Mutex;
155        /// # use plexor_core::axon::{Axon, AxonImpl};
156        /// # use plexor_core::neuron::{Neuron, NeuronImpl};
157        /// # use plexor_core::plexus::Plexus;
158        /// # use plexor_core::namespace::NamespaceImpl;
159        /// # use plexor_core::ganglion::Ganglion;
160        /// # use plexor_core::codec::{Codec, CodecError, CodecName};
161        /// #
162        /// # #[derive(Debug, Clone)]
163        /// # struct Dummy;
164        /// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
165        /// # impl Codec<Dummy> for Dummy {
166        /// #     fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
167        /// #     fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy) }
168        /// # }
169        /// #
170        /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
171        /// let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
172        /// let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
173        /// let plexus = Plexus::new_shared(vec![], vec![]).await;
174        /// plexus.lock().await.adapt(neuron.clone()).await.unwrap();
175        ///
176        /// let mut axon = AxonImpl::new(neuron, plexus);
177        /// axon.transmit(Dummy).await.unwrap();
178        /// # });
179        /// ```
180        fn transmit(
181            &mut self,
182            data: T,
183        ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
184    /// Transmit data with explicit tracing context.
185    fn transmit_with_trace(
186        &mut self,
187        data: T,
188        trace_context: TraceContext,
189    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
190
191    /// Transmit data through the neuron and ganglion using type erasure
192    fn transmit_erased(
193        &mut self,
194        data: T,
195    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
196
197    /// Transmit data using type erasure with explicit tracing context.
198    fn transmit_erased_with_trace(
199        &mut self,
200        data: T,
201        trace_context: TraceContext,
202    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
203
204    /// Get the neuron name
205    fn neuron_name(&self) -> String;
206
207    /// Transmit data with a timeout
208    fn transmit_with_timeout(
209        &mut self,
210        data: T,
211        timeout_duration: Duration,
212    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
213        Box::pin(async move {
214            match timeout(timeout_duration, self.transmit(data)).await {
215                Ok(result) => result,
216                Err(_) => Err(AxonError::TransmissionTimeout),
217            }
218        })
219    }
220
221    /// Transmit data with a trace and timeout
222    fn transmit_with_trace_with_timeout(
223        &mut self,
224        data: T,
225        trace_context: TraceContext,
226        timeout_duration: Duration,
227    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
228        Box::pin(async move {
229            match timeout(timeout_duration, self.transmit_with_trace(data, trace_context)).await {
230                Ok(result) => result,
231                Err(_) => Err(AxonError::TransmissionTimeout),
232            }
233        })
234    }
235
236    /// Transmit multiple data items in batch
237    #[allow(clippy::type_complexity)]
238    fn transmit_batch(
239        &mut self,
240        data_items: Vec<T>,
241    ) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<()>>, AxonError>> + Send + '_>> {
242        Box::pin(async move {
243            let mut results = Vec::new();
244            for item in data_items {
245                match self.transmit(item).await {
246                    Ok(result) => results.push(result),
247                    Err(e) => return Err(e),
248                }
249            }
250            Ok(results)
251        })
252    }
253
254    /// Transmit multiple data items in batch with trace
255    #[allow(clippy::type_complexity)]
256    fn transmit_batch_with_trace(
257        &mut self,
258        data_items: Vec<T>,
259        trace_context: TraceContext,
260    ) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<()>>, AxonError>> + Send + '_>> {
261        Box::pin(async move {
262            let mut results = Vec::new();
263            for item in data_items {
264                match self.transmit_with_trace(item, trace_context).await {
265                    Ok(result) => results.push(result),
266                    Err(e) => return Err(e),
267                }
268            }
269            Ok(results)
270        })
271    }
272
273    /// Check if the axon is ready for transmission (convenience method)
274    fn is_ready(&self) -> bool {
275        !self.neuron_name().is_empty()
276    }
277
278    /// Get a formatted status string for debugging
279    fn status(&self) -> String {
280        format!("Axon[neuron: {}]", self.neuron_name())
281    }
282}
283
284/// Implementation of Axon that stores a neuron and ganglion with type erasure handling
285///
286/// # Examples
287///
288/// ```rust
289/// # use std::sync::Arc;
290/// # use tokio::sync::Mutex;
291/// # use plexor_core::axon::{Axon, AxonImpl};
292/// # use plexor_core::neuron::{Neuron, NeuronImpl};
293/// # use plexor_core::ganglion::{Ganglion, GanglionInprocess};
294/// # use plexor_core::namespace::NamespaceImpl;
295/// # use plexor_core::codec::{Codec, CodecError, CodecName};
296/// #
297/// # #[derive(Debug, Clone)]
298/// # struct Dummy;
299/// # impl CodecName for Dummy { fn name() -> &'static str { "dummy" } }
300/// # impl Codec<Dummy> for Dummy {
301/// #     fn encode(_: &Dummy) -> Result<Vec<u8>, CodecError> { Ok(vec![]) }
302/// #     fn decode(_: &[u8]) -> Result<Dummy, CodecError> { Ok(Dummy) }
303/// # }
304/// #
305/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
306/// let ns = Arc::new(NamespaceImpl { delimiter: ".", parts: vec!["test"] });
307/// let neuron = Arc::new(NeuronImpl::<Dummy, Dummy>::new(ns));
308/// let ganglion = Arc::new(Mutex::new(GanglionInprocess::new()));
309///
310/// let mut axon = AxonImpl::new(neuron, ganglion);
311/// assert!(axon.is_ready());
312/// # });
313/// ```
314pub struct AxonImpl<T, C>
315where
316    T: Send + Sync + 'static,
317    C: Codec<T> + CodecName + Send + Sync + 'static,
318{
319    /// The neuron this axon manages
320    neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>,
321
322    /// The ganglion this axon uses for operations
323    ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
324}
325
326impl<T, C> Clone for AxonImpl<T, C>
327where
328    T: Send + Sync + 'static,
329    C: Codec<T> + CodecName + Send + Sync + 'static,
330{
331    fn clone(&self) -> Self {
332        Self {
333            neuron: self.neuron.clone(),
334            ganglion: self.ganglion.clone(),
335        }
336    }
337}
338
339impl<T, C> AxonImpl<T, C>
340where
341    T: Send + Sync + 'static,
342    C: Codec<T> + CodecName + Send + Sync + 'static,
343{
344    /// Create a new AxonImpl with the given neuron and ganglion
345    pub fn new(
346        neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>,
347        ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
348    ) -> Self {
349        Self { neuron, ganglion }
350    }
351
352    /// Create a builder for constructing an AxonImpl
353    pub fn builder() -> AxonBuilder<T, C> {
354        AxonBuilder::new()
355    }
356
357    /// Get a reference to the underlying neuron
358    pub fn neuron(&self) -> &Arc<dyn Neuron<T, C> + Send + Sync + 'static> {
359        &self.neuron
360    }
361
362    /// Get a reference to the underlying ganglion
363    pub fn ganglion(&self) -> &Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>> {
364        &self.ganglion
365    }
366
367    /// Clone the neuron for sharing
368    pub fn clone_neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync + 'static> {
369        self.neuron.clone()
370    }
371
372    /// Clone the ganglion for sharing
373    pub fn clone_ganglion(&self) -> Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>> {
374        self.ganglion.clone()
375    }
376
377    /// Get the unique ID of the underlying ganglion
378    pub async fn ganglion_id(&self) -> Uuid {
379        self.ganglion.lock().await.unique_id()
380    }
381
382    /// Check if the axon components are properly configured
383    pub fn validate(&self) -> Result<(), String> {
384        if self.neuron.name().is_empty() {
385            return Err("Neuron name is empty".to_string());
386        }
387        Ok(())
388    }
389}
390
391/// Builder for constructing AxonImpl instances with a fluent API
392pub struct AxonBuilder<T, C>
393where
394    T: Send + Sync + 'static,
395    C: Codec<T> + CodecName + Send + Sync + 'static,
396{
397    neuron: Option<Arc<dyn Neuron<T, C> + Send + Sync + 'static>>,
398    ganglion: Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>,
399}
400
401impl<T, C> AxonBuilder<T, C>
402where
403    T: Send + Sync + 'static,
404    C: Codec<T> + CodecName + Send + Sync + 'static,
405{
406    /// Create a new AxonBuilder
407    pub fn new() -> Self {
408        Self {
409            neuron: None,
410            ganglion: None,
411        }
412    }
413
414    /// Set the neuron for the axon
415    pub fn with_neuron(mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>) -> Self {
416        self.neuron = Some(neuron);
417        self
418    }
419
420    /// Set the ganglion for the axon
421    pub fn with_ganglion(
422        mut self,
423        ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
424    ) -> Self {
425        self.ganglion = Some(ganglion);
426        self
427    }
428
429    /// Build the AxonImpl instance
430    pub fn build(self) -> Result<AxonImpl<T, C>, String> {
431        let neuron = self.neuron.ok_or("Neuron is required")?;
432        let ganglion = self.ganglion.ok_or("Ganglion is required")?;
433
434        let axon = AxonImpl::new(neuron, ganglion);
435        axon.validate()?;
436        Ok(axon)
437    }
438
439    /// Build the AxonImpl instance without validation
440    pub fn build_unchecked(self) -> Result<AxonImpl<T, C>, String> {
441        let neuron = self.neuron.ok_or("Neuron is required")?;
442        let ganglion = self.ganglion.ok_or("Ganglion is required")?;
443
444        Ok(AxonImpl::new(neuron, ganglion))
445    }
446}
447
448impl<T, C> Default for AxonBuilder<T, C>
449where
450    T: Send + Sync + 'static,
451    C: Codec<T> + CodecName + Send + Sync + 'static,
452{
453    fn default() -> Self {
454        Self::new()
455    }
456}
457
458impl<T, C> Axon<T, C> for AxonImpl<T, C>
459where
460    T: Send + Sync + 'static,
461    C: Codec<T> + CodecName + Send + Sync + 'static,
462{
463    fn react(
464        &mut self,
465        reactants: Vec<Box<dyn Reactant<T, C> + Send + Sync + 'static>>,
466    ) -> Pin<Box<dyn Future<Output = Result<(), AxonError>> + Send + '_>> {
467        let neuron_name = self.neuron.name();
468        let ganglion = self.ganglion.clone();
469
470        Box::pin(async move {
471            // Convert typed reactants to erased reactants
472            let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = reactants
473                .into_iter()
474                .map(|reactant| reactant.erase())
475                .collect();
476
477            let future = {
478                let mut ganglion_guard = ganglion.lock().await;
479                ganglion_guard.react(neuron_name, erased_reactants, vec![])
480            };
481            match future.await {
482                Ok(result) => Ok(result),
483                Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
484            }
485        })
486    }
487
488    fn transmit(
489        &mut self,
490        data: T,
491    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
492        let neuron = self.neuron.clone();
493        let ganglion = self.ganglion.clone();
494
495        Box::pin(async move {
496            // Create a payload
497            let payload = Payload::new(data, neuron.clone());
498
499            // Transmit through the ganglion
500            let future = {
501                let mut ganglion_guard = ganglion.lock().await;
502                ganglion_guard.transmit(erase_payload(payload))
503            };
504            match future.await {
505                Ok(result) => Ok(result),
506                Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
507            }
508        })
509    }
510
511    fn transmit_with_trace(
512        &mut self,
513        data: T,
514        trace_context: TraceContext,
515    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
516        let neuron = self.neuron.clone();
517        let ganglion = self.ganglion.clone();
518
519        Box::pin(async move {
520            // Create a payload with explicit trace info
521            let payload = Arc::new(Payload::from_parts(
522                Arc::new(data),
523                neuron.clone(),
524                trace_context.new_child(),
525            ));
526
527            // Transmit through the ganglion
528            let future = {
529                let mut ganglion_guard = ganglion.lock().await;
530                ganglion_guard.transmit(erase_payload(payload))
531            };
532            match future.await {
533                Ok(result) => Ok(result),
534                Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
535            }
536        })
537    }
538
539    fn transmit_erased(
540        &mut self,
541        data: T,
542    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
543        let neuron = self.neuron.clone();
544        let ganglion = self.ganglion.clone();
545
546        Box::pin(
547            async move {
548                // Create a payload
549                let payload = Payload::new(data, neuron.clone());
550                let erased_payload = erase_payload(payload);
551                let span = erased_payload.span_debug("AxonImpl::transmit_erased");
552
553                async move {
554                    tracing::debug!("Starting erased transmission");
555                    let future = {
556                        let mut ganglion_guard = ganglion.lock().await;
557                        ganglion_guard.transmit(erased_payload)
558                    };
559                    match future.await {
560                        Ok(result) => Ok(result),
561                        Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
562                    }
563                }
564                .instrument(span)
565                .await
566            },
567        )
568    }
569
570    fn transmit_erased_with_trace(
571        &mut self,
572        data: T,
573        trace_context: TraceContext,
574    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
575        let neuron = self.neuron.clone();
576        let ganglion = self.ganglion.clone();
577
578        Box::pin(async move {
579            // Create a payload with explicit trace info
580            let payload = Arc::new(Payload::from_parts(
581                Arc::new(data),
582                neuron.clone(),
583                trace_context.new_child(),
584            ));
585
586            // Convert to erased payload
587            let erased_payload = erase_payload(payload);
588            let span = erased_payload.span_debug("AxonImpl::transmit_erased_with_trace");
589
590            async move {
591                tracing::debug!("Starting erased transmission with trace");
592
593                let future = {
594                    let mut ganglion_guard = ganglion.lock().await;
595                    ganglion_guard.transmit(erased_payload)
596                };
597                match future.await {
598                    Ok(result) => Ok(result),
599                    Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
600                }
601            }
602            .instrument(span)
603            .await
604        })
605    }
606
607    fn neuron_name(&self) -> String {
608        self.neuron.name()
609    }
610}
611
612impl<T, C> std::fmt::Debug for AxonImpl<T, C>
613where
614    T: Send + Sync + 'static,
615    C: Codec<T> + CodecName + Send + Sync + 'static,
616{
617    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
618        f.debug_struct("AxonImpl")
619            .field("neuron", &self.neuron.name())
620            .finish()
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627    use crate::ganglion::{Ganglion, GanglionInprocess};
628    use crate::neuron::NeuronImpl;
629    use crate::payload::Payload;
630    use crate::plexus::Plexus;
631    use crate::test_utils::{DebugCodec, DebugStruct, TokioMpscReactant, test_namespace};
632    use tokio::sync::{Mutex, mpsc::channel};
633    use uuid::Uuid;
634
635    #[tokio::test]
636    async fn test_axon_creation() {
637        let ns = test_namespace();
638        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
639            Arc::new(NeuronImpl::new(ns));
640        let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
641            Arc::new(Mutex::new(GanglionInprocess::new()));
642
643        let axon = AxonImpl::new(neuron, ganglion);
644
645        assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
646    }
647
648    #[tokio::test]
649    async fn test_axon_transmit_with_plexus() {
650        let ns = test_namespace();
651        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
652        let neuron_arc = neuron_impl.clone_to_arc();
653
654        // Create a plexus to use as the ganglion
655        let mut plexus = Plexus::new(vec![], vec![]).await;
656
657        // Adapt the neuron to the plexus
658        plexus
659            .adapt(neuron_arc.clone())
660            .await
661            .expect("Failed to adapt neuron");
662
663        // Create channels for receiving transmitted data
664        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
665        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
666
667        // Create axon with the plexus as ganglion
668        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
669            Arc::new(Mutex::new(plexus));
670        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
671
672        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> = vec![
673            Box::new(TokioMpscReactant::new(tx1)),
674            Box::new(TokioMpscReactant::new(tx2)),
675        ];
676
677        axon.react(reactants).await.expect("Failed to react");
678
679        // Create test data to transmit
680        let test_data = DebugStruct {
681            foo: 42,
682            bar: "test transmission".to_string(),
683        };
684
685        // Transmit data through the axon
686        let trace_context = TraceContext::new();
687        let correlation_id = trace_context.correlation_id;
688        axon.transmit_with_trace(test_data.clone(), trace_context)
689            .await
690            .expect("Failed to transmit");
691
692        // Verify that the data was received by both reactants
693        let received1 = rx1.recv().await.expect("Failed to receive payload 1");
694        let received2 = rx2.recv().await.expect("Failed to receive payload 2");
695
696        assert_eq!(received1.value.foo, 42);
697        assert_eq!(received1.value.bar, "test transmission");
698        assert_eq!(received2.value.foo, 42);
699        assert_eq!(received2.value.bar, "test transmission");
700        assert_eq!(received1.correlation_id(), correlation_id);
701        assert_eq!(received2.correlation_id(), correlation_id);
702    }
703
704    #[tokio::test]
705    async fn test_axon_transmit_erased() {
706        let ns = test_namespace();
707        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
708        let neuron_arc = neuron_impl.clone_to_arc();
709
710        // Create a plexus to use as the ganglion
711        let mut plexus = Plexus::new(vec![], vec![]).await;
712
713        // Adapt the neuron to the plexus
714        plexus
715            .adapt(neuron_arc.clone())
716            .await
717            .expect("Failed to adapt neuron");
718
719        // Create channel for receiving transmitted data
720        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
721
722        // Create axon with the plexus as ganglion
723        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
724            Arc::new(Mutex::new(plexus));
725        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
726
727        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
728            vec![Box::new(TokioMpscReactant::new(tx))];
729
730        axon.react(reactants).await.expect("Failed to react");
731
732        // Create test data to transmit
733        let test_data = DebugStruct {
734            foo: 99,
735            bar: "erased test".to_string(),
736        };
737
738        // Transmit data through the axon using transmit_erased
739        let trace_context = TraceContext::new();
740        let correlation_id = trace_context.correlation_id;
741        axon.transmit_erased_with_trace(test_data.clone(), trace_context)
742            .await
743            .expect("Failed to transmit erased");
744
745        // Verify that the data was received
746        let received = rx.recv().await.expect("Failed to receive payload");
747
748        assert_eq!(received.value.foo, 99);
749        assert_eq!(received.value.bar, "erased test");
750        assert_eq!(received.correlation_id(), correlation_id);
751    }
752
753    #[tokio::test]
754    async fn test_axon_transmit_without_correlation_id() {
755        let ns = test_namespace();
756        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
757        let neuron_arc = neuron_impl.clone_to_arc();
758
759        // Create a plexus to use as the ganglion
760        let mut plexus = Plexus::new(vec![], vec![]).await;
761
762        // Adapt the neuron to the plexus
763        plexus
764            .adapt(neuron_arc.clone())
765            .await
766            .expect("Failed to adapt neuron");
767
768        // Create channel for receiving transmitted data
769        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
770
771        // Create axon with the plexus as ganglion
772        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
773            Arc::new(Mutex::new(plexus));
774        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
775
776        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
777            vec![Box::new(TokioMpscReactant::new(tx))];
778
779        axon.react(reactants).await.expect("Failed to react");
780
781        // Create test data to transmit
782        let test_data = DebugStruct {
783            foo: 123,
784            bar: "auto correlation".to_string(),
785        };
786
787        // Transmit data through the axon without providing correlation_id
788        axon.transmit(test_data.clone())
789            .await
790            .expect("Failed to transmit");
791
792        // Verify that the data was received and correlation_id was auto-generated
793        let received = rx.recv().await.expect("Failed to receive payload");
794
795        assert_eq!(received.value.foo, 123);
796        assert_eq!(received.value.bar, "auto correlation");
797        // Correlation ID should be auto-generated (not empty)
798        assert_ne!(received.correlation_id(), Uuid::nil());
799    }
800
801    #[tokio::test]
802    async fn test_axon_multiple_transmissions() {
803        let ns = test_namespace();
804        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
805        let neuron_arc = neuron_impl.clone_to_arc();
806
807        // Create a plexus to use as the ganglion
808        let mut plexus = Plexus::new(vec![], vec![]).await;
809
810        // Adapt the neuron to the plexus
811        plexus
812            .adapt(neuron_arc.clone())
813            .await
814            .expect("Failed to adapt neuron");
815
816        // Create channel for receiving transmitted data
817        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
818
819        // Create axon with the plexus as ganglion
820        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
821            Arc::new(Mutex::new(plexus));
822        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
823
824        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
825            vec![Box::new(TokioMpscReactant::new(tx))];
826
827        axon.react(reactants).await.expect("Failed to react");
828
829        // Transmit multiple pieces of data
830        for i in 0..5 {
831            let test_data = DebugStruct {
832                foo: i,
833                bar: format!("transmission {i}"),
834            };
835
836            axon.transmit(test_data)
837                .await
838                .expect("Failed to transmit");
839        }
840
841        // Verify that all transmissions were received
842        for i in 0..5 {
843            let received = rx
844                .recv()
845                .await
846                .unwrap_or_else(|| panic!("Failed to receive payload {i}"));
847
848            assert_eq!(received.value.foo, i);
849            assert_eq!(received.value.bar, format!("transmission {i}"));
850        }
851    }
852
853    #[tokio::test]
854    async fn test_axon_transmit_with_plexus_external_ganglion() {
855        use crate::erasure::reactant::erase_reactant_raw;
856        use crate::ganglion::GanglionExternal;
857        use crate::neuron::NeuronImpl;
858        use crate::payload::PayloadRaw;
859        use crate::plexus::Plexus;
860        use crate::test_utils::{
861            DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactantRaw,
862            test_namespace,
863        };
864        use tokio::sync::mpsc::channel;
865        use tokio::time::{Duration, sleep};
866
867        let ns = test_namespace();
868        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
869        let neuron_arc = neuron_impl.clone_to_arc();
870
871        // Create a plexus to use as the ganglion
872        let mut plexus = Plexus::new(vec![], vec![]).await;
873
874        // Adapt the neuron to the plexus
875        plexus
876            .adapt(neuron_arc.clone())
877            .await
878            .expect("Failed to adapt neuron");
879
880        // Create channel for receiving raw payloads from external ganglion
881        let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
882
883        let mut external_ganglion = GanglionExternalInprocess::new();
884
885        // Adapt the neuron to the external ganglion
886        external_ganglion
887            .adapt(neuron_arc.clone())
888            .await
889            .expect("Failed to adapt neuron to external ganglion");
890
891        // Create raw reactants and add them using react
892        let raw_reactants = vec![erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(
893            TokioMpscReactantRaw::new(tx_raw),
894        ))];
895        external_ganglion
896            .react(neuron_arc.name(), vec![], raw_reactants, vec![])
897            .await
898            .expect("Failed to react with external ganglion");
899
900        // Add the external ganglion to the plexus
901        let external_ganglion_arc = Arc::new(Mutex::new(external_ganglion));
902        let _ = plexus
903            .infuse_external_ganglion(external_ganglion_arc.clone())
904            .await;
905
906        // Create axon with the plexus as ganglion
907        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
908            Arc::new(Mutex::new(plexus));
909        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
910
911        // No need to add reactants to the axon since we're testing the external ganglion
912        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
913            vec![];
914        axon.react(reactants).await.expect("Failed to react");
915
916        // Create test data to transmit
917        let test_data = DebugStruct {
918            foo: 99,
919            bar: "external ganglion test".to_string(),
920        };
921
922        // Transmit data through the axon
923        let trace_context = TraceContext::new();
924        let correlation_id = trace_context.correlation_id;
925        axon.transmit_with_trace(test_data.clone(), trace_context)
926            .await
927            .expect("Failed to transmit");
928
929        // Give some time for async transmission to complete
930        sleep(Duration::from_millis(100)).await;
931
932        // Verify that the external ganglion received the payloadraw
933        assert_eq!(
934            rx_raw.len(),
935            1,
936            "External ganglion should have received exactly one payload"
937        );
938        let received_raw = rx_raw
939            .recv()
940            .await
941            .expect("Failed to receive raw payload from external ganglion");
942
943        // Decode the raw payload to verify it contains the correct data
944        let decoded_data = DebugCodec::decode(&received_raw.value).expect("Failed to decode");
945        assert_eq!(decoded_data.foo, 99);
946        assert_eq!(decoded_data.bar, "external ganglion test");
947        assert_eq!(received_raw.correlation_id(), correlation_id);
948    }
949
950    #[tokio::test]
951    async fn test_axon_transmit_simple() {
952        let ns = test_namespace();
953        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
954        let neuron_arc = neuron_impl.clone_to_arc();
955
956        let mut plexus = Plexus::new(vec![], vec![]).await;
957        plexus
958            .adapt(neuron_arc.clone())
959            .await
960            .expect("Failed to adapt neuron");
961
962        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
963        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
964            Arc::new(Mutex::new(plexus));
965        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
966
967        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
968            vec![Box::new(TokioMpscReactant::new(tx))];
969        axon.react(reactants).await.expect("Failed to react");
970
971        let test_data = DebugStruct {
972            foo: 555,
973            bar: "simple transmission".to_string(),
974        };
975
976        // Test transmit convenience method
977        axon.transmit(test_data.clone())
978            .await
979            .expect("Failed to transmit simple");
980
981        let received = rx.recv().await.expect("Failed to receive payload");
982        assert_eq!(received.value.foo, 555);
983        assert_eq!(received.value.bar, "simple transmission");
984        assert_ne!(received.correlation_id(), Uuid::nil());
985    }
986
987    #[tokio::test]
988    async fn test_axon_transmit_with_timeout() {
989        let ns = test_namespace();
990        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
991        let neuron_arc = neuron_impl.clone_to_arc();
992
993        let mut plexus = Plexus::new(vec![], vec![]).await;
994        plexus
995            .adapt(neuron_arc.clone())
996            .await
997            .expect("Failed to adapt neuron");
998
999        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
1000        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1001            Arc::new(Mutex::new(plexus));
1002        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
1003
1004        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
1005            vec![Box::new(TokioMpscReactant::new(tx))];
1006        axon.react(reactants).await.expect("Failed to react");
1007
1008        let test_data = DebugStruct {
1009            foo: 777,
1010            bar: "timeout test".to_string(),
1011        };
1012
1013        // Test transmit_with_timeout with a reasonable timeout
1014        let timeout_duration = Duration::from_secs(1);
1015        axon.transmit_with_timeout(test_data.clone(), timeout_duration)
1016            .await
1017            .expect("Failed to transmit with timeout");
1018
1019        let received = rx.recv().await.expect("Failed to receive payload");
1020        assert_eq!(received.value.foo, 777);
1021        assert_eq!(received.value.bar, "timeout test");
1022    }
1023
1024    #[tokio::test]
1025    async fn test_axon_transmit_batch() {
1026        let ns = test_namespace();
1027        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1028        let neuron_arc = neuron_impl.clone_to_arc();
1029
1030        let mut plexus = Plexus::new(vec![], vec![]).await;
1031        plexus
1032            .adapt(neuron_arc.clone())
1033            .await
1034            .expect("Failed to adapt neuron");
1035
1036        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1037        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1038            Arc::new(Mutex::new(plexus));
1039        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
1040
1041        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
1042            vec![Box::new(TokioMpscReactant::new(tx))];
1043        axon.react(reactants).await.expect("Failed to react");
1044
1045        // Create batch data
1046        let batch_data = vec![
1047            DebugStruct {
1048                foo: 1,
1049                bar: "batch item 1".to_string(),
1050            },
1051            DebugStruct {
1052                foo: 2,
1053                bar: "batch item 2".to_string(),
1054            },
1055            DebugStruct {
1056                foo: 3,
1057                bar: "batch item 3".to_string(),
1058            },
1059        ];
1060
1061        // Test transmit_batch
1062        let results = axon
1063            .transmit_batch(batch_data.clone())
1064            .await
1065            .expect("Failed to transmit batch");
1066        assert_eq!(results.len(), 3);
1067
1068        // Verify all items were received
1069        for i in 0..3 {
1070            let received = rx
1071                .recv()
1072                .await
1073                .unwrap_or_else(|| panic!("Failed to receive batch item {i}"));
1074            assert_eq!(received.value.foo, (i + 1));
1075            assert_eq!(received.value.bar, format!("batch item {}", i + 1));
1076            // Check that the parent_id is set since this is a batch transmission which might reuse context or generate new
1077            // In the implementation, transmit_batch calls transmit_simple which calls transmit(data, None).
1078            // So parent_id should be None and correlation_id should be set.
1079            assert_ne!(received.correlation_id(), Uuid::nil());
1080        }
1081    }
1082
1083    #[tokio::test]
1084    async fn test_axon_status_and_validation() {
1085        let ns = test_namespace();
1086        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1087        let neuron_arc = neuron_impl.clone_to_arc();
1088
1089        let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1090            Arc::new(Mutex::new(GanglionInprocess::new()));
1091        let axon = AxonImpl::new(neuron_arc, ganglion);
1092
1093        // Test is_ready
1094        assert!(axon.is_ready());
1095
1096        // Test status
1097        let status = axon.status();
1098        assert!(status.contains("dev.plexo.DebugStruct.debug"));
1099
1100        // Test validation
1101        assert!(axon.validate().is_ok());
1102
1103        // Test neuron_name
1104        assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
1105    }
1106
1107    #[tokio::test]
1108    async fn test_axon_builder() {
1109        let ns = test_namespace();
1110        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1111        let neuron_arc = neuron_impl.clone_to_arc();
1112
1113        let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1114            Arc::new(Mutex::new(GanglionInprocess::new()));
1115
1116        // Test builder pattern
1117        let axon = AxonImpl::builder()
1118            .with_neuron(neuron_arc.clone())
1119            .with_ganglion(ganglion.clone())
1120            .build()
1121            .expect("Failed to build axon");
1122
1123        assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
1124        assert!(axon.is_ready());
1125
1126        // Test builder with missing components
1127        let result = AxonImpl::<DebugStruct, DebugCodec>::builder()
1128            .with_neuron(neuron_arc.clone())
1129            .build();
1130        assert!(result.is_err());
1131        assert!(result.unwrap_err().contains("Ganglion is required"));
1132
1133        // Test build_unchecked
1134        let axon_unchecked = AxonImpl::builder()
1135            .with_neuron(neuron_arc.clone())
1136            .with_ganglion(ganglion.clone())
1137            .build_unchecked()
1138            .expect("Failed to build unchecked axon");
1139
1140        assert_eq!(axon_unchecked.neuron_name(), "dev.plexo.DebugStruct.debug");
1141    }
1142
1143    #[tokio::test]
1144    async fn test_axon_additional_methods() {
1145        let ns = test_namespace();
1146        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1147        let neuron_arc = neuron_impl.clone_to_arc();
1148
1149        let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1150            Arc::new(Mutex::new(GanglionInprocess::new()));
1151        let axon = AxonImpl::new(neuron_arc.clone(), ganglion.clone());
1152
1153        // Test clone methods
1154        let cloned_neuron = axon.clone_neuron();
1155        assert_eq!(cloned_neuron.name(), neuron_arc.name());
1156
1157        // Test ganglion_id
1158        let ganglion_id = axon.ganglion_id().await;
1159        assert_ne!(ganglion_id, Uuid::nil());
1160
1161        // Test neuron and ganglion getters
1162        assert_eq!(axon.neuron().name(), "dev.plexo.DebugStruct.debug");
1163    }
1164
1165    #[test]
1166    fn test_axon_error_variants_and_conversion() {
1167        let ganglion_id = Uuid::now_v7();
1168        let ganglion_name = "TestGanglion".to_string();
1169        let neuron_name = "test_neuron".to_string();
1170
1171        // Test GanglionError::SynapseNotFound -> AxonError::NeuronNotAdapted conversion
1172        let ganglion_synapse_not_found = GanglionError::SynapseNotFound {
1173            neuron_name: neuron_name.clone(),
1174            ganglion_name: ganglion_name.clone(),
1175            ganglion_id,
1176        };
1177        let axon_neuron_not_adapted = AxonError::from_ganglion_error(ganglion_synapse_not_found);
1178        match axon_neuron_not_adapted {
1179            AxonError::NeuronNotAdapted {
1180                neuron_name: n,
1181                ganglion_name: g,
1182                ganglion_id: id,
1183            } => {
1184                assert_eq!(n, neuron_name);
1185                assert_eq!(g, ganglion_name);
1186                assert_eq!(id, ganglion_id);
1187            }
1188            _ => panic!("Expected NeuronNotAdapted variant"),
1189        }
1190
1191        // Test GanglionError::SynapseLockError -> AxonError::SynapseLockError conversion
1192        let ganglion_synapse_lock = GanglionError::SynapseLock {
1193            neuron_name: neuron_name.clone(),
1194            ganglion_name: ganglion_name.clone(),
1195            ganglion_id,
1196        };
1197        let axon_synapse_lock = AxonError::from_ganglion_error(ganglion_synapse_lock);
1198        match axon_synapse_lock {
1199            AxonError::SynapseLock {
1200                neuron_name: n,
1201                ganglion_name: g,
1202                ganglion_id: id,
1203            } => {
1204                assert_eq!(n, neuron_name);
1205                assert_eq!(g, ganglion_name);
1206                assert_eq!(id, ganglion_id);
1207            }
1208            _ => panic!("Expected SynapseLockError variant"),
1209        }
1210
1211        // Test GanglionError::Transmit -> AxonError::Transmit conversion
1212        let ganglion_transmit = GanglionError::Transmit {
1213            neuron_name: neuron_name.clone(),
1214            ganglion_name: ganglion_name.clone(),
1215            ganglion_id,
1216            message: "test transmit error".to_string(),
1217        };
1218        let axon_transmit = AxonError::from_ganglion_error(ganglion_transmit);
1219        match axon_transmit {
1220            AxonError::Transmit {
1221                neuron_name: n,
1222                ganglion_name: g,
1223                ganglion_id: id,
1224                message: m,
1225            } => {
1226                assert_eq!(n, neuron_name);
1227                assert_eq!(g, ganglion_name);
1228                assert_eq!(id, ganglion_id);
1229                assert_eq!(m, "test transmit error");
1230            }
1231            _ => panic!("Expected Transmit variant"),
1232        }
1233
1234        // Test direct AxonError creation
1235        let direct_neuron_not_adapted = AxonError::NeuronNotAdapted {
1236            neuron_name: neuron_name.clone(),
1237            ganglion_name: ganglion_name.clone(),
1238            ganglion_id,
1239        };
1240        assert!(direct_neuron_not_adapted.to_string().contains(&neuron_name));
1241        assert!(
1242            direct_neuron_not_adapted
1243                .to_string()
1244                .contains(&ganglion_name)
1245        );
1246
1247        let direct_transmit = AxonError::Transmit {
1248            neuron_name: neuron_name.clone(),
1249            ganglion_name: ganglion_name.clone(),
1250            ganglion_id,
1251            message: "direct message".to_string(),
1252        };
1253        assert!(direct_transmit.to_string().contains(&neuron_name));
1254        assert!(direct_transmit.to_string().contains(&ganglion_name));
1255        assert!(direct_transmit.to_string().contains("direct message"));
1256    }
1257}