Skip to main content

plexor_core/
axon.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! Axon module for handling neuron-ganglion interactions with type erasure.
8//!
9//! This module provides the Axon trait and AxonImpl struct that manage the interface
10//! between neurons and ganglia, handling type erasure and providing a clean API
11//! for neuron operations.
12
13use crate::codec::{Codec, CodecName};
14use crate::erasure::payload::erase_payload;
15use crate::erasure::reactant::ReactantErased;
16use crate::ganglion::{GanglionError, GanglionInternal};
17use crate::logging::TraceContext;
18use crate::neuron::Neuron;
19use crate::payload::Payload;
20use crate::reactant::Reactant;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::time::Duration;
25use thiserror::Error;
26use tokio::sync::Mutex;
27use tokio::time::timeout;
28use tracing::Instrument;
29use uuid::Uuid;
30
31/// Error types for Axon operations
32#[derive(Error, Debug)]
33pub enum AxonError {
34    #[error(
35        "Neuron needs to be adapted to ganglion {ganglion_name} (id: {ganglion_id}): {neuron_name}"
36    )]
37    NeuronNotAdapted {
38        neuron_name: String,
39        ganglion_name: String,
40        ganglion_id: Uuid,
41    },
42    #[error(
43        "Failed to acquire lock on synapse: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
44    )]
45    SynapseLock {
46        neuron_name: String,
47        ganglion_name: String,
48        ganglion_id: Uuid,
49    },
50    #[error(
51        "Transmission error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id}): {message}"
52    )]
53    Transmit {
54        neuron_name: String,
55        ganglion_name: String,
56        ganglion_id: Uuid,
57        message: String,
58    },
59    #[error("Encode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
60    Encode {
61        neuron_name: String,
62        ganglion_name: String,
63        ganglion_id: Uuid,
64    },
65    #[error("Decode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
66    Decode {
67        neuron_name: String,
68        ganglion_name: String,
69        ganglion_id: Uuid,
70    },
71    #[error("Transmission timeout")]
72    TransmissionTimeout,
73    #[error("Ganglion error: {0}")]
74    GanglionError(#[from] GanglionError),
75}
76
77impl AxonError {
78    /// Convert a GanglionError to AxonError, translating SynapseNotFound appropriately
79    pub fn from_ganglion_error(error: GanglionError) -> Self {
80        match error {
81            GanglionError::SynapseNotFound {
82                neuron_name,
83                ganglion_name,
84                ganglion_id,
85            } => AxonError::NeuronNotAdapted {
86                neuron_name,
87                ganglion_name,
88                ganglion_id,
89            },
90            GanglionError::SynapseLock {
91                neuron_name,
92                ganglion_name,
93                ganglion_id,
94            } => AxonError::SynapseLock {
95                neuron_name,
96                ganglion_name,
97                ganglion_id,
98            },
99            GanglionError::Transmit {
100                neuron_name,
101                ganglion_name,
102                ganglion_id,
103                message,
104            } => AxonError::Transmit {
105                neuron_name,
106                ganglion_name,
107                ganglion_id,
108                message,
109            },
110            GanglionError::Encode {
111                neuron_name,
112                ganglion_name,
113                ganglion_id,
114            } => AxonError::Encode {
115                neuron_name,
116                ganglion_name,
117                ganglion_id,
118            },
119            GanglionError::Decode {
120                neuron_name,
121                ganglion_name,
122                ganglion_id,
123            } => AxonError::Decode {
124                neuron_name,
125                ganglion_name,
126                ganglion_id,
127            },
128            GanglionError::Adapt { .. } | GanglionError::QueueFull { .. } => {
129                // For adaptation and queue full errors, use the generic GanglionError wrapper
130                AxonError::GanglionError(error)
131            }
132        }
133    }
134}
135
136/// Trait for axon operations with typed neurons and data
137pub trait Axon<T, C>: Send + Sync
138where
139    T: Send + Sync + 'static,
140    C: Codec<T> + CodecName + Send + Sync + 'static,
141{
142    /// React to incoming reactants using the neuron
143    fn react(
144        &mut self,
145        reactants: Vec<Box<dyn Reactant<T, C> + Send + Sync + 'static>>,
146    ) -> Pin<Box<dyn Future<Output = Result<(), AxonError>> + Send + '_>>;
147
148    /// Transmit data through the neuron and ganglion
149    fn transmit(
150        &mut self,
151        data: T,
152    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
153
154    /// Transmit data with explicit tracing context.
155    fn transmit_with_trace(
156        &mut self,
157        data: T,
158        trace_context: TraceContext,
159    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
160
161    /// Transmit data through the neuron and ganglion using type erasure
162    fn transmit_erased(
163        &mut self,
164        data: T,
165    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
166
167    /// Transmit data using type erasure with explicit tracing context.
168    fn transmit_erased_with_trace(
169        &mut self,
170        data: T,
171        trace_context: TraceContext,
172    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
173
174    /// Get the neuron name
175    fn neuron_name(&self) -> String;
176
177    /// Transmit data with a timeout
178    fn transmit_with_timeout(
179        &mut self,
180        data: T,
181        timeout_duration: Duration,
182    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
183        Box::pin(async move {
184            match timeout(timeout_duration, self.transmit(data)).await {
185                Ok(result) => result,
186                Err(_) => Err(AxonError::TransmissionTimeout),
187            }
188        })
189    }
190
191    /// Transmit data with a trace and timeout
192    fn transmit_with_trace_with_timeout(
193        &mut self,
194        data: T,
195        trace_context: TraceContext,
196        timeout_duration: Duration,
197    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
198        Box::pin(async move {
199            match timeout(timeout_duration, self.transmit_with_trace(data, trace_context)).await {
200                Ok(result) => result,
201                Err(_) => Err(AxonError::TransmissionTimeout),
202            }
203        })
204    }
205
206    /// Transmit multiple data items in batch
207    #[allow(clippy::type_complexity)]
208    fn transmit_batch(
209        &mut self,
210        data_items: Vec<T>,
211    ) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<()>>, AxonError>> + Send + '_>> {
212        Box::pin(async move {
213            let mut results = Vec::new();
214            for item in data_items {
215                match self.transmit(item).await {
216                    Ok(result) => results.push(result),
217                    Err(e) => return Err(e),
218                }
219            }
220            Ok(results)
221        })
222    }
223
224    /// Transmit multiple data items in batch with trace
225    #[allow(clippy::type_complexity)]
226    fn transmit_batch_with_trace(
227        &mut self,
228        data_items: Vec<T>,
229        trace_context: TraceContext,
230    ) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<()>>, AxonError>> + Send + '_>> {
231        Box::pin(async move {
232            let mut results = Vec::new();
233            for item in data_items {
234                match self.transmit_with_trace(item, trace_context).await {
235                    Ok(result) => results.push(result),
236                    Err(e) => return Err(e),
237                }
238            }
239            Ok(results)
240        })
241    }
242
243    /// Check if the axon is ready for transmission (convenience method)
244    fn is_ready(&self) -> bool {
245        !self.neuron_name().is_empty()
246    }
247
248    /// Get a formatted status string for debugging
249    fn status(&self) -> String {
250        format!("Axon[neuron: {}]", self.neuron_name())
251    }
252}
253
254/// Implementation of Axon that stores a neuron and ganglion with type erasure handling
255pub struct AxonImpl<T, C>
256where
257    T: Send + Sync + 'static,
258    C: Codec<T> + CodecName + Send + Sync + 'static,
259{
260    /// The neuron this axon manages
261    neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>,
262
263    /// The ganglion this axon uses for operations
264    ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
265}
266
267impl<T, C> Clone for AxonImpl<T, C>
268where
269    T: Send + Sync + 'static,
270    C: Codec<T> + CodecName + Send + Sync + 'static,
271{
272    fn clone(&self) -> Self {
273        Self {
274            neuron: self.neuron.clone(),
275            ganglion: self.ganglion.clone(),
276        }
277    }
278}
279
280impl<T, C> AxonImpl<T, C>
281where
282    T: Send + Sync + 'static,
283    C: Codec<T> + CodecName + Send + Sync + 'static,
284{
285    /// Create a new AxonImpl with the given neuron and ganglion
286    pub fn new(
287        neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>,
288        ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
289    ) -> Self {
290        Self { neuron, ganglion }
291    }
292
293    /// Create a builder for constructing an AxonImpl
294    pub fn builder() -> AxonBuilder<T, C> {
295        AxonBuilder::new()
296    }
297
298    /// Get a reference to the underlying neuron
299    pub fn neuron(&self) -> &Arc<dyn Neuron<T, C> + Send + Sync + 'static> {
300        &self.neuron
301    }
302
303    /// Get a reference to the underlying ganglion
304    pub fn ganglion(&self) -> &Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>> {
305        &self.ganglion
306    }
307
308    /// Clone the neuron for sharing
309    pub fn clone_neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync + 'static> {
310        self.neuron.clone()
311    }
312
313    /// Clone the ganglion for sharing
314    pub fn clone_ganglion(&self) -> Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>> {
315        self.ganglion.clone()
316    }
317
318    /// Get the unique ID of the underlying ganglion
319    pub async fn ganglion_id(&self) -> Uuid {
320        self.ganglion.lock().await.unique_id()
321    }
322
323    /// Check if the axon components are properly configured
324    pub fn validate(&self) -> Result<(), String> {
325        if self.neuron.name().is_empty() {
326            return Err("Neuron name is empty".to_string());
327        }
328        Ok(())
329    }
330}
331
332/// Builder for constructing AxonImpl instances with a fluent API
333pub struct AxonBuilder<T, C>
334where
335    T: Send + Sync + 'static,
336    C: Codec<T> + CodecName + Send + Sync + 'static,
337{
338    neuron: Option<Arc<dyn Neuron<T, C> + Send + Sync + 'static>>,
339    ganglion: Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>,
340}
341
342impl<T, C> AxonBuilder<T, C>
343where
344    T: Send + Sync + 'static,
345    C: Codec<T> + CodecName + Send + Sync + 'static,
346{
347    /// Create a new AxonBuilder
348    pub fn new() -> Self {
349        Self {
350            neuron: None,
351            ganglion: None,
352        }
353    }
354
355    /// Set the neuron for the axon
356    pub fn with_neuron(mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>) -> Self {
357        self.neuron = Some(neuron);
358        self
359    }
360
361    /// Set the ganglion for the axon
362    pub fn with_ganglion(
363        mut self,
364        ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
365    ) -> Self {
366        self.ganglion = Some(ganglion);
367        self
368    }
369
370    /// Build the AxonImpl instance
371    pub fn build(self) -> Result<AxonImpl<T, C>, String> {
372        let neuron = self.neuron.ok_or("Neuron is required")?;
373        let ganglion = self.ganglion.ok_or("Ganglion is required")?;
374
375        let axon = AxonImpl::new(neuron, ganglion);
376        axon.validate()?;
377        Ok(axon)
378    }
379
380    /// Build the AxonImpl instance without validation
381    pub fn build_unchecked(self) -> Result<AxonImpl<T, C>, String> {
382        let neuron = self.neuron.ok_or("Neuron is required")?;
383        let ganglion = self.ganglion.ok_or("Ganglion is required")?;
384
385        Ok(AxonImpl::new(neuron, ganglion))
386    }
387}
388
389impl<T, C> Default for AxonBuilder<T, C>
390where
391    T: Send + Sync + 'static,
392    C: Codec<T> + CodecName + Send + Sync + 'static,
393{
394    fn default() -> Self {
395        Self::new()
396    }
397}
398
399impl<T, C> Axon<T, C> for AxonImpl<T, C>
400where
401    T: Send + Sync + 'static,
402    C: Codec<T> + CodecName + Send + Sync + 'static,
403{
404    fn react(
405        &mut self,
406        reactants: Vec<Box<dyn Reactant<T, C> + Send + Sync + 'static>>,
407    ) -> Pin<Box<dyn Future<Output = Result<(), AxonError>> + Send + '_>> {
408        let neuron_name = self.neuron.name();
409        let ganglion = self.ganglion.clone();
410
411        Box::pin(async move {
412            // Convert typed reactants to erased reactants
413            let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = reactants
414                .into_iter()
415                .map(|reactant| reactant.erase())
416                .collect();
417
418            let future = {
419                let mut ganglion_guard = ganglion.lock().await;
420                ganglion_guard.react(neuron_name, erased_reactants, vec![])
421            };
422            match future.await {
423                Ok(result) => Ok(result),
424                Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
425            }
426        })
427    }
428
429    fn transmit(
430        &mut self,
431        data: T,
432    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
433        let neuron = self.neuron.clone();
434        let ganglion = self.ganglion.clone();
435
436        Box::pin(async move {
437            // Create a payload
438            let payload = Payload::new(data, neuron.clone());
439
440            // Transmit through the ganglion
441            let future = {
442                let mut ganglion_guard = ganglion.lock().await;
443                ganglion_guard.transmit(erase_payload(payload))
444            };
445            match future.await {
446                Ok(result) => Ok(result),
447                Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
448            }
449        })
450    }
451
452    fn transmit_with_trace(
453        &mut self,
454        data: T,
455        trace_context: TraceContext,
456    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
457        let neuron = self.neuron.clone();
458        let ganglion = self.ganglion.clone();
459
460        Box::pin(async move {
461            // Create a payload with explicit trace info
462            let payload = Arc::new(Payload::from_parts(
463                Arc::new(data),
464                neuron.clone(),
465                trace_context.new_child(),
466            ));
467
468            // Transmit through the ganglion
469            let future = {
470                let mut ganglion_guard = ganglion.lock().await;
471                ganglion_guard.transmit(erase_payload(payload))
472            };
473            match future.await {
474                Ok(result) => Ok(result),
475                Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
476            }
477        })
478    }
479
480    fn transmit_erased(
481        &mut self,
482        data: T,
483    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
484        let neuron = self.neuron.clone();
485        let ganglion = self.ganglion.clone();
486
487        Box::pin(
488            async move {
489                // Create a payload
490                let payload = Payload::new(data, neuron.clone());
491                let erased_payload = erase_payload(payload);
492                let span = erased_payload.span_debug("AxonImpl::transmit_erased");
493
494                async move {
495                    tracing::debug!("Starting erased transmission");
496                    let future = {
497                        let mut ganglion_guard = ganglion.lock().await;
498                        ganglion_guard.transmit(erased_payload)
499                    };
500                    match future.await {
501                        Ok(result) => Ok(result),
502                        Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
503                    }
504                }
505                .instrument(span)
506                .await
507            },
508        )
509    }
510
511    fn transmit_erased_with_trace(
512        &mut self,
513        data: T,
514        trace_context: TraceContext,
515    ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
516        let neuron = self.neuron.clone();
517        let ganglion = self.ganglion.clone();
518
519        Box::pin(async move {
520            // Create a payload with explicit trace info
521            let payload = Arc::new(Payload::from_parts(
522                Arc::new(data),
523                neuron.clone(),
524                trace_context.new_child(),
525            ));
526
527            // Convert to erased payload
528            let erased_payload = erase_payload(payload);
529            let span = erased_payload.span_debug("AxonImpl::transmit_erased_with_trace");
530
531            async move {
532                tracing::debug!("Starting erased transmission with trace");
533
534                let future = {
535                    let mut ganglion_guard = ganglion.lock().await;
536                    ganglion_guard.transmit(erased_payload)
537                };
538                match future.await {
539                    Ok(result) => Ok(result),
540                    Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
541                }
542            }
543            .instrument(span)
544            .await
545        })
546    }
547
548    fn neuron_name(&self) -> String {
549        self.neuron.name()
550    }
551}
552
553impl<T, C> std::fmt::Debug for AxonImpl<T, C>
554where
555    T: Send + Sync + 'static,
556    C: Codec<T> + CodecName + Send + Sync + 'static,
557{
558    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
559        f.debug_struct("AxonImpl")
560            .field("neuron", &self.neuron.name())
561            .finish()
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568    use crate::ganglion::{Ganglion, GanglionInprocess};
569    use crate::neuron::NeuronImpl;
570    use crate::payload::Payload;
571    use crate::plexus::Plexus;
572    use crate::test_utils::{DebugCodec, DebugStruct, TokioMpscReactant, test_namespace};
573    use tokio::sync::{Mutex, mpsc::channel};
574    use uuid::Uuid;
575
576    #[tokio::test]
577    async fn test_axon_creation() {
578        let ns = test_namespace();
579        let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
580            Arc::new(NeuronImpl::new(ns));
581        let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
582            Arc::new(Mutex::new(GanglionInprocess::new()));
583
584        let axon = AxonImpl::new(neuron, ganglion);
585
586        assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
587    }
588
589    #[tokio::test]
590    async fn test_axon_transmit_with_plexus() {
591        let ns = test_namespace();
592        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
593        let neuron_arc = neuron_impl.clone_to_arc();
594
595        // Create a plexus to use as the ganglion
596        let mut plexus = Plexus::new(vec![], vec![]).await;
597
598        // Adapt the neuron to the plexus
599        plexus
600            .adapt(neuron_arc.clone())
601            .await
602            .expect("Failed to adapt neuron");
603
604        // Create channels for receiving transmitted data
605        let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
606        let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
607
608        // Create axon with the plexus as ganglion
609        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
610            Arc::new(Mutex::new(plexus));
611        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
612
613        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> = vec![
614            Box::new(TokioMpscReactant::new(tx1)),
615            Box::new(TokioMpscReactant::new(tx2)),
616        ];
617
618        axon.react(reactants).await.expect("Failed to react");
619
620        // Create test data to transmit
621        let test_data = DebugStruct {
622            foo: 42,
623            bar: "test transmission".to_string(),
624        };
625
626        // Transmit data through the axon
627        let trace_context = TraceContext::new();
628        let correlation_id = trace_context.correlation_id;
629        axon.transmit_with_trace(test_data.clone(), trace_context)
630            .await
631            .expect("Failed to transmit");
632
633        // Verify that the data was received by both reactants
634        let received1 = rx1.recv().await.expect("Failed to receive payload 1");
635        let received2 = rx2.recv().await.expect("Failed to receive payload 2");
636
637        assert_eq!(received1.value.foo, 42);
638        assert_eq!(received1.value.bar, "test transmission");
639        assert_eq!(received2.value.foo, 42);
640        assert_eq!(received2.value.bar, "test transmission");
641        assert_eq!(received1.correlation_id(), correlation_id);
642        assert_eq!(received2.correlation_id(), correlation_id);
643    }
644
645    #[tokio::test]
646    async fn test_axon_transmit_erased() {
647        let ns = test_namespace();
648        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
649        let neuron_arc = neuron_impl.clone_to_arc();
650
651        // Create a plexus to use as the ganglion
652        let mut plexus = Plexus::new(vec![], vec![]).await;
653
654        // Adapt the neuron to the plexus
655        plexus
656            .adapt(neuron_arc.clone())
657            .await
658            .expect("Failed to adapt neuron");
659
660        // Create channel for receiving transmitted data
661        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
662
663        // Create axon with the plexus as ganglion
664        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
665            Arc::new(Mutex::new(plexus));
666        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
667
668        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
669            vec![Box::new(TokioMpscReactant::new(tx))];
670
671        axon.react(reactants).await.expect("Failed to react");
672
673        // Create test data to transmit
674        let test_data = DebugStruct {
675            foo: 99,
676            bar: "erased test".to_string(),
677        };
678
679        // Transmit data through the axon using transmit_erased
680        let trace_context = TraceContext::new();
681        let correlation_id = trace_context.correlation_id;
682        axon.transmit_erased_with_trace(test_data.clone(), trace_context)
683            .await
684            .expect("Failed to transmit erased");
685
686        // Verify that the data was received
687        let received = rx.recv().await.expect("Failed to receive payload");
688
689        assert_eq!(received.value.foo, 99);
690        assert_eq!(received.value.bar, "erased test");
691        assert_eq!(received.correlation_id(), correlation_id);
692    }
693
694    #[tokio::test]
695    async fn test_axon_transmit_without_correlation_id() {
696        let ns = test_namespace();
697        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
698        let neuron_arc = neuron_impl.clone_to_arc();
699
700        // Create a plexus to use as the ganglion
701        let mut plexus = Plexus::new(vec![], vec![]).await;
702
703        // Adapt the neuron to the plexus
704        plexus
705            .adapt(neuron_arc.clone())
706            .await
707            .expect("Failed to adapt neuron");
708
709        // Create channel for receiving transmitted data
710        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
711
712        // Create axon with the plexus as ganglion
713        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
714            Arc::new(Mutex::new(plexus));
715        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
716
717        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
718            vec![Box::new(TokioMpscReactant::new(tx))];
719
720        axon.react(reactants).await.expect("Failed to react");
721
722        // Create test data to transmit
723        let test_data = DebugStruct {
724            foo: 123,
725            bar: "auto correlation".to_string(),
726        };
727
728        // Transmit data through the axon without providing correlation_id
729        axon.transmit(test_data.clone())
730            .await
731            .expect("Failed to transmit");
732
733        // Verify that the data was received and correlation_id was auto-generated
734        let received = rx.recv().await.expect("Failed to receive payload");
735
736        assert_eq!(received.value.foo, 123);
737        assert_eq!(received.value.bar, "auto correlation");
738        // Correlation ID should be auto-generated (not empty)
739        assert_ne!(received.correlation_id(), Uuid::nil());
740    }
741
742    #[tokio::test]
743    async fn test_axon_multiple_transmissions() {
744        let ns = test_namespace();
745        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
746        let neuron_arc = neuron_impl.clone_to_arc();
747
748        // Create a plexus to use as the ganglion
749        let mut plexus = Plexus::new(vec![], vec![]).await;
750
751        // Adapt the neuron to the plexus
752        plexus
753            .adapt(neuron_arc.clone())
754            .await
755            .expect("Failed to adapt neuron");
756
757        // Create channel for receiving transmitted data
758        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
759
760        // Create axon with the plexus as ganglion
761        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
762            Arc::new(Mutex::new(plexus));
763        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
764
765        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
766            vec![Box::new(TokioMpscReactant::new(tx))];
767
768        axon.react(reactants).await.expect("Failed to react");
769
770        // Transmit multiple pieces of data
771        for i in 0..5 {
772            let test_data = DebugStruct {
773                foo: i,
774                bar: format!("transmission {i}"),
775            };
776
777            axon.transmit(test_data)
778                .await
779                .expect("Failed to transmit");
780        }
781
782        // Verify that all transmissions were received
783        for i in 0..5 {
784            let received = rx
785                .recv()
786                .await
787                .unwrap_or_else(|| panic!("Failed to receive payload {i}"));
788
789            assert_eq!(received.value.foo, i);
790            assert_eq!(received.value.bar, format!("transmission {i}"));
791        }
792    }
793
794    #[tokio::test]
795    async fn test_axon_transmit_with_plexus_external_ganglion() {
796        use crate::erasure::reactant::erase_reactant_raw;
797        use crate::ganglion::GanglionExternal;
798        use crate::neuron::NeuronImpl;
799        use crate::payload::PayloadRaw;
800        use crate::plexus::Plexus;
801        use crate::test_utils::{
802            DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactantRaw,
803            test_namespace,
804        };
805        use tokio::sync::mpsc::channel;
806        use tokio::time::{Duration, sleep};
807
808        let ns = test_namespace();
809        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
810        let neuron_arc = neuron_impl.clone_to_arc();
811
812        // Create a plexus to use as the ganglion
813        let mut plexus = Plexus::new(vec![], vec![]).await;
814
815        // Adapt the neuron to the plexus
816        plexus
817            .adapt(neuron_arc.clone())
818            .await
819            .expect("Failed to adapt neuron");
820
821        // Create channel for receiving raw payloads from external ganglion
822        let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
823
824        let mut external_ganglion = GanglionExternalInprocess::new();
825
826        // Adapt the neuron to the external ganglion
827        external_ganglion
828            .adapt(neuron_arc.clone())
829            .await
830            .expect("Failed to adapt neuron to external ganglion");
831
832        // Create raw reactants and add them using react
833        let raw_reactants = vec![erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(
834            TokioMpscReactantRaw::new(tx_raw),
835        ))];
836        external_ganglion
837            .react(neuron_arc.name(), vec![], raw_reactants, vec![])
838            .await
839            .expect("Failed to react with external ganglion");
840
841        // Add the external ganglion to the plexus
842        let external_ganglion_arc = Arc::new(Mutex::new(external_ganglion));
843        let _ = plexus
844            .infuse_external_ganglion(external_ganglion_arc.clone())
845            .await;
846
847        // Create axon with the plexus as ganglion
848        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
849            Arc::new(Mutex::new(plexus));
850        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
851
852        // No need to add reactants to the axon since we're testing the external ganglion
853        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
854            vec![];
855        axon.react(reactants).await.expect("Failed to react");
856
857        // Create test data to transmit
858        let test_data = DebugStruct {
859            foo: 99,
860            bar: "external ganglion test".to_string(),
861        };
862
863        // Transmit data through the axon
864        let trace_context = TraceContext::new();
865        let correlation_id = trace_context.correlation_id;
866        axon.transmit_with_trace(test_data.clone(), trace_context)
867            .await
868            .expect("Failed to transmit");
869
870        // Give some time for async transmission to complete
871        sleep(Duration::from_millis(100)).await;
872
873        // Verify that the external ganglion received the payloadraw
874        assert_eq!(
875            rx_raw.len(),
876            1,
877            "External ganglion should have received exactly one payload"
878        );
879        let received_raw = rx_raw
880            .recv()
881            .await
882            .expect("Failed to receive raw payload from external ganglion");
883
884        // Decode the raw payload to verify it contains the correct data
885        let decoded_data = DebugCodec::decode(&received_raw.value).expect("Failed to decode");
886        assert_eq!(decoded_data.foo, 99);
887        assert_eq!(decoded_data.bar, "external ganglion test");
888        assert_eq!(received_raw.correlation_id(), correlation_id);
889    }
890
891    #[tokio::test]
892    async fn test_axon_transmit_simple() {
893        let ns = test_namespace();
894        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
895        let neuron_arc = neuron_impl.clone_to_arc();
896
897        let mut plexus = Plexus::new(vec![], vec![]).await;
898        plexus
899            .adapt(neuron_arc.clone())
900            .await
901            .expect("Failed to adapt neuron");
902
903        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
904        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
905            Arc::new(Mutex::new(plexus));
906        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
907
908        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
909            vec![Box::new(TokioMpscReactant::new(tx))];
910        axon.react(reactants).await.expect("Failed to react");
911
912        let test_data = DebugStruct {
913            foo: 555,
914            bar: "simple transmission".to_string(),
915        };
916
917        // Test transmit convenience method
918        axon.transmit(test_data.clone())
919            .await
920            .expect("Failed to transmit simple");
921
922        let received = rx.recv().await.expect("Failed to receive payload");
923        assert_eq!(received.value.foo, 555);
924        assert_eq!(received.value.bar, "simple transmission");
925        assert_ne!(received.correlation_id(), Uuid::nil());
926    }
927
928    #[tokio::test]
929    async fn test_axon_transmit_with_timeout() {
930        let ns = test_namespace();
931        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
932        let neuron_arc = neuron_impl.clone_to_arc();
933
934        let mut plexus = Plexus::new(vec![], vec![]).await;
935        plexus
936            .adapt(neuron_arc.clone())
937            .await
938            .expect("Failed to adapt neuron");
939
940        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
941        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
942            Arc::new(Mutex::new(plexus));
943        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
944
945        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
946            vec![Box::new(TokioMpscReactant::new(tx))];
947        axon.react(reactants).await.expect("Failed to react");
948
949        let test_data = DebugStruct {
950            foo: 777,
951            bar: "timeout test".to_string(),
952        };
953
954        // Test transmit_with_timeout with a reasonable timeout
955        let timeout_duration = Duration::from_secs(1);
956        axon.transmit_with_timeout(test_data.clone(), timeout_duration)
957            .await
958            .expect("Failed to transmit with timeout");
959
960        let received = rx.recv().await.expect("Failed to receive payload");
961        assert_eq!(received.value.foo, 777);
962        assert_eq!(received.value.bar, "timeout test");
963    }
964
965    #[tokio::test]
966    async fn test_axon_transmit_batch() {
967        let ns = test_namespace();
968        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
969        let neuron_arc = neuron_impl.clone_to_arc();
970
971        let mut plexus = Plexus::new(vec![], vec![]).await;
972        plexus
973            .adapt(neuron_arc.clone())
974            .await
975            .expect("Failed to adapt neuron");
976
977        let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
978        let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
979            Arc::new(Mutex::new(plexus));
980        let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
981
982        let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
983            vec![Box::new(TokioMpscReactant::new(tx))];
984        axon.react(reactants).await.expect("Failed to react");
985
986        // Create batch data
987        let batch_data = vec![
988            DebugStruct {
989                foo: 1,
990                bar: "batch item 1".to_string(),
991            },
992            DebugStruct {
993                foo: 2,
994                bar: "batch item 2".to_string(),
995            },
996            DebugStruct {
997                foo: 3,
998                bar: "batch item 3".to_string(),
999            },
1000        ];
1001
1002        // Test transmit_batch
1003        let results = axon
1004            .transmit_batch(batch_data.clone())
1005            .await
1006            .expect("Failed to transmit batch");
1007        assert_eq!(results.len(), 3);
1008
1009        // Verify all items were received
1010        for i in 0..3 {
1011            let received = rx
1012                .recv()
1013                .await
1014                .unwrap_or_else(|| panic!("Failed to receive batch item {i}"));
1015            assert_eq!(received.value.foo, (i + 1));
1016            assert_eq!(received.value.bar, format!("batch item {}", i + 1));
1017            // Check that the parent_id is set since this is a batch transmission which might reuse context or generate new
1018            // In the implementation, transmit_batch calls transmit_simple which calls transmit(data, None).
1019            // So parent_id should be None and correlation_id should be set.
1020            assert_ne!(received.correlation_id(), Uuid::nil());
1021        }
1022    }
1023
1024    #[tokio::test]
1025    async fn test_axon_status_and_validation() {
1026        let ns = test_namespace();
1027        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1028        let neuron_arc = neuron_impl.clone_to_arc();
1029
1030        let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1031            Arc::new(Mutex::new(GanglionInprocess::new()));
1032        let axon = AxonImpl::new(neuron_arc, ganglion);
1033
1034        // Test is_ready
1035        assert!(axon.is_ready());
1036
1037        // Test status
1038        let status = axon.status();
1039        assert!(status.contains("dev.plexo.DebugStruct.debug"));
1040
1041        // Test validation
1042        assert!(axon.validate().is_ok());
1043
1044        // Test neuron_name
1045        assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
1046    }
1047
1048    #[tokio::test]
1049    async fn test_axon_builder() {
1050        let ns = test_namespace();
1051        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1052        let neuron_arc = neuron_impl.clone_to_arc();
1053
1054        let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1055            Arc::new(Mutex::new(GanglionInprocess::new()));
1056
1057        // Test builder pattern
1058        let axon = AxonImpl::builder()
1059            .with_neuron(neuron_arc.clone())
1060            .with_ganglion(ganglion.clone())
1061            .build()
1062            .expect("Failed to build axon");
1063
1064        assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
1065        assert!(axon.is_ready());
1066
1067        // Test builder with missing components
1068        let result = AxonImpl::<DebugStruct, DebugCodec>::builder()
1069            .with_neuron(neuron_arc.clone())
1070            .build();
1071        assert!(result.is_err());
1072        assert!(result.unwrap_err().contains("Ganglion is required"));
1073
1074        // Test build_unchecked
1075        let axon_unchecked = AxonImpl::builder()
1076            .with_neuron(neuron_arc.clone())
1077            .with_ganglion(ganglion.clone())
1078            .build_unchecked()
1079            .expect("Failed to build unchecked axon");
1080
1081        assert_eq!(axon_unchecked.neuron_name(), "dev.plexo.DebugStruct.debug");
1082    }
1083
1084    #[tokio::test]
1085    async fn test_axon_additional_methods() {
1086        let ns = test_namespace();
1087        let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1088        let neuron_arc = neuron_impl.clone_to_arc();
1089
1090        let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1091            Arc::new(Mutex::new(GanglionInprocess::new()));
1092        let axon = AxonImpl::new(neuron_arc.clone(), ganglion.clone());
1093
1094        // Test clone methods
1095        let cloned_neuron = axon.clone_neuron();
1096        assert_eq!(cloned_neuron.name(), neuron_arc.name());
1097
1098        // Test ganglion_id
1099        let ganglion_id = axon.ganglion_id().await;
1100        assert_ne!(ganglion_id, Uuid::nil());
1101
1102        // Test neuron and ganglion getters
1103        assert_eq!(axon.neuron().name(), "dev.plexo.DebugStruct.debug");
1104    }
1105
1106    #[test]
1107    fn test_axon_error_variants_and_conversion() {
1108        let ganglion_id = Uuid::now_v7();
1109        let ganglion_name = "TestGanglion".to_string();
1110        let neuron_name = "test_neuron".to_string();
1111
1112        // Test GanglionError::SynapseNotFound -> AxonError::NeuronNotAdapted conversion
1113        let ganglion_synapse_not_found = GanglionError::SynapseNotFound {
1114            neuron_name: neuron_name.clone(),
1115            ganglion_name: ganglion_name.clone(),
1116            ganglion_id,
1117        };
1118        let axon_neuron_not_adapted = AxonError::from_ganglion_error(ganglion_synapse_not_found);
1119        match axon_neuron_not_adapted {
1120            AxonError::NeuronNotAdapted {
1121                neuron_name: n,
1122                ganglion_name: g,
1123                ganglion_id: id,
1124            } => {
1125                assert_eq!(n, neuron_name);
1126                assert_eq!(g, ganglion_name);
1127                assert_eq!(id, ganglion_id);
1128            }
1129            _ => panic!("Expected NeuronNotAdapted variant"),
1130        }
1131
1132        // Test GanglionError::SynapseLockError -> AxonError::SynapseLockError conversion
1133        let ganglion_synapse_lock = GanglionError::SynapseLock {
1134            neuron_name: neuron_name.clone(),
1135            ganglion_name: ganglion_name.clone(),
1136            ganglion_id,
1137        };
1138        let axon_synapse_lock = AxonError::from_ganglion_error(ganglion_synapse_lock);
1139        match axon_synapse_lock {
1140            AxonError::SynapseLock {
1141                neuron_name: n,
1142                ganglion_name: g,
1143                ganglion_id: id,
1144            } => {
1145                assert_eq!(n, neuron_name);
1146                assert_eq!(g, ganglion_name);
1147                assert_eq!(id, ganglion_id);
1148            }
1149            _ => panic!("Expected SynapseLockError variant"),
1150        }
1151
1152        // Test GanglionError::Transmit -> AxonError::Transmit conversion
1153        let ganglion_transmit = GanglionError::Transmit {
1154            neuron_name: neuron_name.clone(),
1155            ganglion_name: ganglion_name.clone(),
1156            ganglion_id,
1157            message: "test transmit error".to_string(),
1158        };
1159        let axon_transmit = AxonError::from_ganglion_error(ganglion_transmit);
1160        match axon_transmit {
1161            AxonError::Transmit {
1162                neuron_name: n,
1163                ganglion_name: g,
1164                ganglion_id: id,
1165                message: m,
1166            } => {
1167                assert_eq!(n, neuron_name);
1168                assert_eq!(g, ganglion_name);
1169                assert_eq!(id, ganglion_id);
1170                assert_eq!(m, "test transmit error");
1171            }
1172            _ => panic!("Expected Transmit variant"),
1173        }
1174
1175        // Test direct AxonError creation
1176        let direct_neuron_not_adapted = AxonError::NeuronNotAdapted {
1177            neuron_name: neuron_name.clone(),
1178            ganglion_name: ganglion_name.clone(),
1179            ganglion_id,
1180        };
1181        assert!(direct_neuron_not_adapted.to_string().contains(&neuron_name));
1182        assert!(
1183            direct_neuron_not_adapted
1184                .to_string()
1185                .contains(&ganglion_name)
1186        );
1187
1188        let direct_transmit = AxonError::Transmit {
1189            neuron_name: neuron_name.clone(),
1190            ganglion_name: ganglion_name.clone(),
1191            ganglion_id,
1192            message: "direct message".to_string(),
1193        };
1194        assert!(direct_transmit.to_string().contains(&neuron_name));
1195        assert!(direct_transmit.to_string().contains(&ganglion_name));
1196        assert!(direct_transmit.to_string().contains("direct message"));
1197    }
1198}