1use crate::codec::{Codec, CodecName};
14use crate::erasure::payload::erase_payload;
15use crate::erasure::reactant::ReactantErased;
16use crate::ganglion::{GanglionError, GanglionInternal};
17use crate::logging::TraceContext;
18use crate::neuron::Neuron;
19use crate::payload::Payload;
20use crate::reactant::Reactant;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::time::Duration;
25use thiserror::Error;
26use tokio::sync::Mutex;
27use tokio::time::timeout;
28use tracing::Instrument;
29use uuid::Uuid;
30
31#[derive(Error, Debug)]
33pub enum AxonError {
34 #[error(
35 "Neuron needs to be adapted to ganglion {ganglion_name} (id: {ganglion_id}): {neuron_name}"
36 )]
37 NeuronNotAdapted {
38 neuron_name: String,
39 ganglion_name: String,
40 ganglion_id: Uuid,
41 },
42 #[error(
43 "Failed to acquire lock on synapse: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
44 )]
45 SynapseLock {
46 neuron_name: String,
47 ganglion_name: String,
48 ganglion_id: Uuid,
49 },
50 #[error(
51 "Transmission error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id}): {message}"
52 )]
53 Transmit {
54 neuron_name: String,
55 ganglion_name: String,
56 ganglion_id: Uuid,
57 message: String,
58 },
59 #[error("Encode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
60 Encode {
61 neuron_name: String,
62 ganglion_name: String,
63 ganglion_id: Uuid,
64 },
65 #[error("Decode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
66 Decode {
67 neuron_name: String,
68 ganglion_name: String,
69 ganglion_id: Uuid,
70 },
71 #[error("Transmission timeout")]
72 TransmissionTimeout,
73 #[error("Ganglion error: {0}")]
74 GanglionError(#[from] GanglionError),
75}
76
77impl AxonError {
78 pub fn from_ganglion_error(error: GanglionError) -> Self {
80 match error {
81 GanglionError::SynapseNotFound {
82 neuron_name,
83 ganglion_name,
84 ganglion_id,
85 } => AxonError::NeuronNotAdapted {
86 neuron_name,
87 ganglion_name,
88 ganglion_id,
89 },
90 GanglionError::SynapseLock {
91 neuron_name,
92 ganglion_name,
93 ganglion_id,
94 } => AxonError::SynapseLock {
95 neuron_name,
96 ganglion_name,
97 ganglion_id,
98 },
99 GanglionError::Transmit {
100 neuron_name,
101 ganglion_name,
102 ganglion_id,
103 message,
104 } => AxonError::Transmit {
105 neuron_name,
106 ganglion_name,
107 ganglion_id,
108 message,
109 },
110 GanglionError::Encode {
111 neuron_name,
112 ganglion_name,
113 ganglion_id,
114 } => AxonError::Encode {
115 neuron_name,
116 ganglion_name,
117 ganglion_id,
118 },
119 GanglionError::Decode {
120 neuron_name,
121 ganglion_name,
122 ganglion_id,
123 } => AxonError::Decode {
124 neuron_name,
125 ganglion_name,
126 ganglion_id,
127 },
128 GanglionError::Adapt { .. } | GanglionError::QueueFull { .. } => {
129 AxonError::GanglionError(error)
131 }
132 }
133 }
134}
135
136pub trait Axon<T, C>: Send + Sync
138where
139 T: Send + Sync + 'static,
140 C: Codec<T> + CodecName + Send + Sync + 'static,
141{
142 fn react(
144 &mut self,
145 reactants: Vec<Box<dyn Reactant<T, C> + Send + Sync + 'static>>,
146 ) -> Pin<Box<dyn Future<Output = Result<(), AxonError>> + Send + '_>>;
147
148 fn transmit(
181 &mut self,
182 data: T,
183 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
184 fn transmit_with_trace(
186 &mut self,
187 data: T,
188 trace_context: TraceContext,
189 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
190
191 fn transmit_erased(
193 &mut self,
194 data: T,
195 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
196
197 fn transmit_erased_with_trace(
199 &mut self,
200 data: T,
201 trace_context: TraceContext,
202 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
203
204 fn neuron_name(&self) -> String;
206
207 fn transmit_with_timeout(
209 &mut self,
210 data: T,
211 timeout_duration: Duration,
212 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
213 Box::pin(async move {
214 match timeout(timeout_duration, self.transmit(data)).await {
215 Ok(result) => result,
216 Err(_) => Err(AxonError::TransmissionTimeout),
217 }
218 })
219 }
220
221 fn transmit_with_trace_with_timeout(
223 &mut self,
224 data: T,
225 trace_context: TraceContext,
226 timeout_duration: Duration,
227 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
228 Box::pin(async move {
229 match timeout(timeout_duration, self.transmit_with_trace(data, trace_context)).await {
230 Ok(result) => result,
231 Err(_) => Err(AxonError::TransmissionTimeout),
232 }
233 })
234 }
235
236 #[allow(clippy::type_complexity)]
238 fn transmit_batch(
239 &mut self,
240 data_items: Vec<T>,
241 ) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<()>>, AxonError>> + Send + '_>> {
242 Box::pin(async move {
243 let mut results = Vec::new();
244 for item in data_items {
245 match self.transmit(item).await {
246 Ok(result) => results.push(result),
247 Err(e) => return Err(e),
248 }
249 }
250 Ok(results)
251 })
252 }
253
254 #[allow(clippy::type_complexity)]
256 fn transmit_batch_with_trace(
257 &mut self,
258 data_items: Vec<T>,
259 trace_context: TraceContext,
260 ) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<()>>, AxonError>> + Send + '_>> {
261 Box::pin(async move {
262 let mut results = Vec::new();
263 for item in data_items {
264 match self.transmit_with_trace(item, trace_context).await {
265 Ok(result) => results.push(result),
266 Err(e) => return Err(e),
267 }
268 }
269 Ok(results)
270 })
271 }
272
273 fn is_ready(&self) -> bool {
275 !self.neuron_name().is_empty()
276 }
277
278 fn status(&self) -> String {
280 format!("Axon[neuron: {}]", self.neuron_name())
281 }
282}
283
284pub struct AxonImpl<T, C>
315where
316 T: Send + Sync + 'static,
317 C: Codec<T> + CodecName + Send + Sync + 'static,
318{
319 neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>,
321
322 ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
324}
325
326impl<T, C> Clone for AxonImpl<T, C>
327where
328 T: Send + Sync + 'static,
329 C: Codec<T> + CodecName + Send + Sync + 'static,
330{
331 fn clone(&self) -> Self {
332 Self {
333 neuron: self.neuron.clone(),
334 ganglion: self.ganglion.clone(),
335 }
336 }
337}
338
339impl<T, C> AxonImpl<T, C>
340where
341 T: Send + Sync + 'static,
342 C: Codec<T> + CodecName + Send + Sync + 'static,
343{
344 pub fn new(
346 neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>,
347 ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
348 ) -> Self {
349 Self { neuron, ganglion }
350 }
351
352 pub fn builder() -> AxonBuilder<T, C> {
354 AxonBuilder::new()
355 }
356
357 pub fn neuron(&self) -> &Arc<dyn Neuron<T, C> + Send + Sync + 'static> {
359 &self.neuron
360 }
361
362 pub fn ganglion(&self) -> &Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>> {
364 &self.ganglion
365 }
366
367 pub fn clone_neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync + 'static> {
369 self.neuron.clone()
370 }
371
372 pub fn clone_ganglion(&self) -> Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>> {
374 self.ganglion.clone()
375 }
376
377 pub async fn ganglion_id(&self) -> Uuid {
379 self.ganglion.lock().await.unique_id()
380 }
381
382 pub fn validate(&self) -> Result<(), String> {
384 if self.neuron.name().is_empty() {
385 return Err("Neuron name is empty".to_string());
386 }
387 Ok(())
388 }
389}
390
391pub struct AxonBuilder<T, C>
393where
394 T: Send + Sync + 'static,
395 C: Codec<T> + CodecName + Send + Sync + 'static,
396{
397 neuron: Option<Arc<dyn Neuron<T, C> + Send + Sync + 'static>>,
398 ganglion: Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>,
399}
400
401impl<T, C> AxonBuilder<T, C>
402where
403 T: Send + Sync + 'static,
404 C: Codec<T> + CodecName + Send + Sync + 'static,
405{
406 pub fn new() -> Self {
408 Self {
409 neuron: None,
410 ganglion: None,
411 }
412 }
413
414 pub fn with_neuron(mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>) -> Self {
416 self.neuron = Some(neuron);
417 self
418 }
419
420 pub fn with_ganglion(
422 mut self,
423 ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
424 ) -> Self {
425 self.ganglion = Some(ganglion);
426 self
427 }
428
429 pub fn build(self) -> Result<AxonImpl<T, C>, String> {
431 let neuron = self.neuron.ok_or("Neuron is required")?;
432 let ganglion = self.ganglion.ok_or("Ganglion is required")?;
433
434 let axon = AxonImpl::new(neuron, ganglion);
435 axon.validate()?;
436 Ok(axon)
437 }
438
439 pub fn build_unchecked(self) -> Result<AxonImpl<T, C>, String> {
441 let neuron = self.neuron.ok_or("Neuron is required")?;
442 let ganglion = self.ganglion.ok_or("Ganglion is required")?;
443
444 Ok(AxonImpl::new(neuron, ganglion))
445 }
446}
447
448impl<T, C> Default for AxonBuilder<T, C>
449where
450 T: Send + Sync + 'static,
451 C: Codec<T> + CodecName + Send + Sync + 'static,
452{
453 fn default() -> Self {
454 Self::new()
455 }
456}
457
458impl<T, C> Axon<T, C> for AxonImpl<T, C>
459where
460 T: Send + Sync + 'static,
461 C: Codec<T> + CodecName + Send + Sync + 'static,
462{
463 fn react(
464 &mut self,
465 reactants: Vec<Box<dyn Reactant<T, C> + Send + Sync + 'static>>,
466 ) -> Pin<Box<dyn Future<Output = Result<(), AxonError>> + Send + '_>> {
467 let neuron_name = self.neuron.name();
468 let ganglion = self.ganglion.clone();
469
470 Box::pin(async move {
471 let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = reactants
473 .into_iter()
474 .map(|reactant| reactant.erase())
475 .collect();
476
477 let future = {
478 let mut ganglion_guard = ganglion.lock().await;
479 ganglion_guard.react(neuron_name, erased_reactants, vec![])
480 };
481 match future.await {
482 Ok(result) => Ok(result),
483 Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
484 }
485 })
486 }
487
488 fn transmit(
489 &mut self,
490 data: T,
491 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
492 let neuron = self.neuron.clone();
493 let ganglion = self.ganglion.clone();
494
495 Box::pin(async move {
496 let payload = Payload::new(data, neuron.clone());
498
499 let future = {
501 let mut ganglion_guard = ganglion.lock().await;
502 ganglion_guard.transmit(erase_payload(payload))
503 };
504 match future.await {
505 Ok(result) => Ok(result),
506 Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
507 }
508 })
509 }
510
511 fn transmit_with_trace(
512 &mut self,
513 data: T,
514 trace_context: TraceContext,
515 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
516 let neuron = self.neuron.clone();
517 let ganglion = self.ganglion.clone();
518
519 Box::pin(async move {
520 let payload = Arc::new(Payload::from_parts(
522 Arc::new(data),
523 neuron.clone(),
524 trace_context.new_child(),
525 ));
526
527 let future = {
529 let mut ganglion_guard = ganglion.lock().await;
530 ganglion_guard.transmit(erase_payload(payload))
531 };
532 match future.await {
533 Ok(result) => Ok(result),
534 Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
535 }
536 })
537 }
538
539 fn transmit_erased(
540 &mut self,
541 data: T,
542 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
543 let neuron = self.neuron.clone();
544 let ganglion = self.ganglion.clone();
545
546 Box::pin(
547 async move {
548 let payload = Payload::new(data, neuron.clone());
550 let erased_payload = erase_payload(payload);
551 let span = erased_payload.span_debug("AxonImpl::transmit_erased");
552
553 async move {
554 tracing::debug!("Starting erased transmission");
555 let future = {
556 let mut ganglion_guard = ganglion.lock().await;
557 ganglion_guard.transmit(erased_payload)
558 };
559 match future.await {
560 Ok(result) => Ok(result),
561 Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
562 }
563 }
564 .instrument(span)
565 .await
566 },
567 )
568 }
569
570 fn transmit_erased_with_trace(
571 &mut self,
572 data: T,
573 trace_context: TraceContext,
574 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
575 let neuron = self.neuron.clone();
576 let ganglion = self.ganglion.clone();
577
578 Box::pin(async move {
579 let payload = Arc::new(Payload::from_parts(
581 Arc::new(data),
582 neuron.clone(),
583 trace_context.new_child(),
584 ));
585
586 let erased_payload = erase_payload(payload);
588 let span = erased_payload.span_debug("AxonImpl::transmit_erased_with_trace");
589
590 async move {
591 tracing::debug!("Starting erased transmission with trace");
592
593 let future = {
594 let mut ganglion_guard = ganglion.lock().await;
595 ganglion_guard.transmit(erased_payload)
596 };
597 match future.await {
598 Ok(result) => Ok(result),
599 Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
600 }
601 }
602 .instrument(span)
603 .await
604 })
605 }
606
607 fn neuron_name(&self) -> String {
608 self.neuron.name()
609 }
610}
611
612impl<T, C> std::fmt::Debug for AxonImpl<T, C>
613where
614 T: Send + Sync + 'static,
615 C: Codec<T> + CodecName + Send + Sync + 'static,
616{
617 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
618 f.debug_struct("AxonImpl")
619 .field("neuron", &self.neuron.name())
620 .finish()
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627 use crate::ganglion::{Ganglion, GanglionInprocess};
628 use crate::neuron::NeuronImpl;
629 use crate::payload::Payload;
630 use crate::plexus::Plexus;
631 use crate::test_utils::{DebugCodec, DebugStruct, TokioMpscReactant, test_namespace};
632 use tokio::sync::{Mutex, mpsc::channel};
633 use uuid::Uuid;
634
635 #[tokio::test]
636 async fn test_axon_creation() {
637 let ns = test_namespace();
638 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
639 Arc::new(NeuronImpl::new(ns));
640 let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
641 Arc::new(Mutex::new(GanglionInprocess::new()));
642
643 let axon = AxonImpl::new(neuron, ganglion);
644
645 assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
646 }
647
648 #[tokio::test]
649 async fn test_axon_transmit_with_plexus() {
650 let ns = test_namespace();
651 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
652 let neuron_arc = neuron_impl.clone_to_arc();
653
654 let mut plexus = Plexus::new(vec![], vec![]).await;
656
657 plexus
659 .adapt(neuron_arc.clone())
660 .await
661 .expect("Failed to adapt neuron");
662
663 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
665 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
666
667 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
669 Arc::new(Mutex::new(plexus));
670 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
671
672 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> = vec![
673 Box::new(TokioMpscReactant::new(tx1)),
674 Box::new(TokioMpscReactant::new(tx2)),
675 ];
676
677 axon.react(reactants).await.expect("Failed to react");
678
679 let test_data = DebugStruct {
681 foo: 42,
682 bar: "test transmission".to_string(),
683 };
684
685 let trace_context = TraceContext::new();
687 let correlation_id = trace_context.correlation_id;
688 axon.transmit_with_trace(test_data.clone(), trace_context)
689 .await
690 .expect("Failed to transmit");
691
692 let received1 = rx1.recv().await.expect("Failed to receive payload 1");
694 let received2 = rx2.recv().await.expect("Failed to receive payload 2");
695
696 assert_eq!(received1.value.foo, 42);
697 assert_eq!(received1.value.bar, "test transmission");
698 assert_eq!(received2.value.foo, 42);
699 assert_eq!(received2.value.bar, "test transmission");
700 assert_eq!(received1.correlation_id(), correlation_id);
701 assert_eq!(received2.correlation_id(), correlation_id);
702 }
703
704 #[tokio::test]
705 async fn test_axon_transmit_erased() {
706 let ns = test_namespace();
707 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
708 let neuron_arc = neuron_impl.clone_to_arc();
709
710 let mut plexus = Plexus::new(vec![], vec![]).await;
712
713 plexus
715 .adapt(neuron_arc.clone())
716 .await
717 .expect("Failed to adapt neuron");
718
719 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
721
722 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
724 Arc::new(Mutex::new(plexus));
725 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
726
727 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
728 vec![Box::new(TokioMpscReactant::new(tx))];
729
730 axon.react(reactants).await.expect("Failed to react");
731
732 let test_data = DebugStruct {
734 foo: 99,
735 bar: "erased test".to_string(),
736 };
737
738 let trace_context = TraceContext::new();
740 let correlation_id = trace_context.correlation_id;
741 axon.transmit_erased_with_trace(test_data.clone(), trace_context)
742 .await
743 .expect("Failed to transmit erased");
744
745 let received = rx.recv().await.expect("Failed to receive payload");
747
748 assert_eq!(received.value.foo, 99);
749 assert_eq!(received.value.bar, "erased test");
750 assert_eq!(received.correlation_id(), correlation_id);
751 }
752
753 #[tokio::test]
754 async fn test_axon_transmit_without_correlation_id() {
755 let ns = test_namespace();
756 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
757 let neuron_arc = neuron_impl.clone_to_arc();
758
759 let mut plexus = Plexus::new(vec![], vec![]).await;
761
762 plexus
764 .adapt(neuron_arc.clone())
765 .await
766 .expect("Failed to adapt neuron");
767
768 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
770
771 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
773 Arc::new(Mutex::new(plexus));
774 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
775
776 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
777 vec![Box::new(TokioMpscReactant::new(tx))];
778
779 axon.react(reactants).await.expect("Failed to react");
780
781 let test_data = DebugStruct {
783 foo: 123,
784 bar: "auto correlation".to_string(),
785 };
786
787 axon.transmit(test_data.clone())
789 .await
790 .expect("Failed to transmit");
791
792 let received = rx.recv().await.expect("Failed to receive payload");
794
795 assert_eq!(received.value.foo, 123);
796 assert_eq!(received.value.bar, "auto correlation");
797 assert_ne!(received.correlation_id(), Uuid::nil());
799 }
800
801 #[tokio::test]
802 async fn test_axon_multiple_transmissions() {
803 let ns = test_namespace();
804 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
805 let neuron_arc = neuron_impl.clone_to_arc();
806
807 let mut plexus = Plexus::new(vec![], vec![]).await;
809
810 plexus
812 .adapt(neuron_arc.clone())
813 .await
814 .expect("Failed to adapt neuron");
815
816 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
818
819 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
821 Arc::new(Mutex::new(plexus));
822 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
823
824 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
825 vec![Box::new(TokioMpscReactant::new(tx))];
826
827 axon.react(reactants).await.expect("Failed to react");
828
829 for i in 0..5 {
831 let test_data = DebugStruct {
832 foo: i,
833 bar: format!("transmission {i}"),
834 };
835
836 axon.transmit(test_data)
837 .await
838 .expect("Failed to transmit");
839 }
840
841 for i in 0..5 {
843 let received = rx
844 .recv()
845 .await
846 .unwrap_or_else(|| panic!("Failed to receive payload {i}"));
847
848 assert_eq!(received.value.foo, i);
849 assert_eq!(received.value.bar, format!("transmission {i}"));
850 }
851 }
852
853 #[tokio::test]
854 async fn test_axon_transmit_with_plexus_external_ganglion() {
855 use crate::erasure::reactant::erase_reactant_raw;
856 use crate::ganglion::GanglionExternal;
857 use crate::neuron::NeuronImpl;
858 use crate::payload::PayloadRaw;
859 use crate::plexus::Plexus;
860 use crate::test_utils::{
861 DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactantRaw,
862 test_namespace,
863 };
864 use tokio::sync::mpsc::channel;
865 use tokio::time::{Duration, sleep};
866
867 let ns = test_namespace();
868 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
869 let neuron_arc = neuron_impl.clone_to_arc();
870
871 let mut plexus = Plexus::new(vec![], vec![]).await;
873
874 plexus
876 .adapt(neuron_arc.clone())
877 .await
878 .expect("Failed to adapt neuron");
879
880 let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
882
883 let mut external_ganglion = GanglionExternalInprocess::new();
884
885 external_ganglion
887 .adapt(neuron_arc.clone())
888 .await
889 .expect("Failed to adapt neuron to external ganglion");
890
891 let raw_reactants = vec![erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(
893 TokioMpscReactantRaw::new(tx_raw),
894 ))];
895 external_ganglion
896 .react(neuron_arc.name(), vec![], raw_reactants, vec![])
897 .await
898 .expect("Failed to react with external ganglion");
899
900 let external_ganglion_arc = Arc::new(Mutex::new(external_ganglion));
902 let _ = plexus
903 .infuse_external_ganglion(external_ganglion_arc.clone())
904 .await;
905
906 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
908 Arc::new(Mutex::new(plexus));
909 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
910
911 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
913 vec![];
914 axon.react(reactants).await.expect("Failed to react");
915
916 let test_data = DebugStruct {
918 foo: 99,
919 bar: "external ganglion test".to_string(),
920 };
921
922 let trace_context = TraceContext::new();
924 let correlation_id = trace_context.correlation_id;
925 axon.transmit_with_trace(test_data.clone(), trace_context)
926 .await
927 .expect("Failed to transmit");
928
929 sleep(Duration::from_millis(100)).await;
931
932 assert_eq!(
934 rx_raw.len(),
935 1,
936 "External ganglion should have received exactly one payload"
937 );
938 let received_raw = rx_raw
939 .recv()
940 .await
941 .expect("Failed to receive raw payload from external ganglion");
942
943 let decoded_data = DebugCodec::decode(&received_raw.value).expect("Failed to decode");
945 assert_eq!(decoded_data.foo, 99);
946 assert_eq!(decoded_data.bar, "external ganglion test");
947 assert_eq!(received_raw.correlation_id(), correlation_id);
948 }
949
950 #[tokio::test]
951 async fn test_axon_transmit_simple() {
952 let ns = test_namespace();
953 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
954 let neuron_arc = neuron_impl.clone_to_arc();
955
956 let mut plexus = Plexus::new(vec![], vec![]).await;
957 plexus
958 .adapt(neuron_arc.clone())
959 .await
960 .expect("Failed to adapt neuron");
961
962 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
963 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
964 Arc::new(Mutex::new(plexus));
965 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
966
967 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
968 vec![Box::new(TokioMpscReactant::new(tx))];
969 axon.react(reactants).await.expect("Failed to react");
970
971 let test_data = DebugStruct {
972 foo: 555,
973 bar: "simple transmission".to_string(),
974 };
975
976 axon.transmit(test_data.clone())
978 .await
979 .expect("Failed to transmit simple");
980
981 let received = rx.recv().await.expect("Failed to receive payload");
982 assert_eq!(received.value.foo, 555);
983 assert_eq!(received.value.bar, "simple transmission");
984 assert_ne!(received.correlation_id(), Uuid::nil());
985 }
986
987 #[tokio::test]
988 async fn test_axon_transmit_with_timeout() {
989 let ns = test_namespace();
990 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
991 let neuron_arc = neuron_impl.clone_to_arc();
992
993 let mut plexus = Plexus::new(vec![], vec![]).await;
994 plexus
995 .adapt(neuron_arc.clone())
996 .await
997 .expect("Failed to adapt neuron");
998
999 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
1000 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1001 Arc::new(Mutex::new(plexus));
1002 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
1003
1004 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
1005 vec![Box::new(TokioMpscReactant::new(tx))];
1006 axon.react(reactants).await.expect("Failed to react");
1007
1008 let test_data = DebugStruct {
1009 foo: 777,
1010 bar: "timeout test".to_string(),
1011 };
1012
1013 let timeout_duration = Duration::from_secs(1);
1015 axon.transmit_with_timeout(test_data.clone(), timeout_duration)
1016 .await
1017 .expect("Failed to transmit with timeout");
1018
1019 let received = rx.recv().await.expect("Failed to receive payload");
1020 assert_eq!(received.value.foo, 777);
1021 assert_eq!(received.value.bar, "timeout test");
1022 }
1023
1024 #[tokio::test]
1025 async fn test_axon_transmit_batch() {
1026 let ns = test_namespace();
1027 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1028 let neuron_arc = neuron_impl.clone_to_arc();
1029
1030 let mut plexus = Plexus::new(vec![], vec![]).await;
1031 plexus
1032 .adapt(neuron_arc.clone())
1033 .await
1034 .expect("Failed to adapt neuron");
1035
1036 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
1037 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1038 Arc::new(Mutex::new(plexus));
1039 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
1040
1041 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
1042 vec![Box::new(TokioMpscReactant::new(tx))];
1043 axon.react(reactants).await.expect("Failed to react");
1044
1045 let batch_data = vec![
1047 DebugStruct {
1048 foo: 1,
1049 bar: "batch item 1".to_string(),
1050 },
1051 DebugStruct {
1052 foo: 2,
1053 bar: "batch item 2".to_string(),
1054 },
1055 DebugStruct {
1056 foo: 3,
1057 bar: "batch item 3".to_string(),
1058 },
1059 ];
1060
1061 let results = axon
1063 .transmit_batch(batch_data.clone())
1064 .await
1065 .expect("Failed to transmit batch");
1066 assert_eq!(results.len(), 3);
1067
1068 for i in 0..3 {
1070 let received = rx
1071 .recv()
1072 .await
1073 .unwrap_or_else(|| panic!("Failed to receive batch item {i}"));
1074 assert_eq!(received.value.foo, (i + 1));
1075 assert_eq!(received.value.bar, format!("batch item {}", i + 1));
1076 assert_ne!(received.correlation_id(), Uuid::nil());
1080 }
1081 }
1082
1083 #[tokio::test]
1084 async fn test_axon_status_and_validation() {
1085 let ns = test_namespace();
1086 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1087 let neuron_arc = neuron_impl.clone_to_arc();
1088
1089 let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1090 Arc::new(Mutex::new(GanglionInprocess::new()));
1091 let axon = AxonImpl::new(neuron_arc, ganglion);
1092
1093 assert!(axon.is_ready());
1095
1096 let status = axon.status();
1098 assert!(status.contains("dev.plexo.DebugStruct.debug"));
1099
1100 assert!(axon.validate().is_ok());
1102
1103 assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
1105 }
1106
1107 #[tokio::test]
1108 async fn test_axon_builder() {
1109 let ns = test_namespace();
1110 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1111 let neuron_arc = neuron_impl.clone_to_arc();
1112
1113 let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1114 Arc::new(Mutex::new(GanglionInprocess::new()));
1115
1116 let axon = AxonImpl::builder()
1118 .with_neuron(neuron_arc.clone())
1119 .with_ganglion(ganglion.clone())
1120 .build()
1121 .expect("Failed to build axon");
1122
1123 assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
1124 assert!(axon.is_ready());
1125
1126 let result = AxonImpl::<DebugStruct, DebugCodec>::builder()
1128 .with_neuron(neuron_arc.clone())
1129 .build();
1130 assert!(result.is_err());
1131 assert!(result.unwrap_err().contains("Ganglion is required"));
1132
1133 let axon_unchecked = AxonImpl::builder()
1135 .with_neuron(neuron_arc.clone())
1136 .with_ganglion(ganglion.clone())
1137 .build_unchecked()
1138 .expect("Failed to build unchecked axon");
1139
1140 assert_eq!(axon_unchecked.neuron_name(), "dev.plexo.DebugStruct.debug");
1141 }
1142
1143 #[tokio::test]
1144 async fn test_axon_additional_methods() {
1145 let ns = test_namespace();
1146 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1147 let neuron_arc = neuron_impl.clone_to_arc();
1148
1149 let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1150 Arc::new(Mutex::new(GanglionInprocess::new()));
1151 let axon = AxonImpl::new(neuron_arc.clone(), ganglion.clone());
1152
1153 let cloned_neuron = axon.clone_neuron();
1155 assert_eq!(cloned_neuron.name(), neuron_arc.name());
1156
1157 let ganglion_id = axon.ganglion_id().await;
1159 assert_ne!(ganglion_id, Uuid::nil());
1160
1161 assert_eq!(axon.neuron().name(), "dev.plexo.DebugStruct.debug");
1163 }
1164
1165 #[test]
1166 fn test_axon_error_variants_and_conversion() {
1167 let ganglion_id = Uuid::now_v7();
1168 let ganglion_name = "TestGanglion".to_string();
1169 let neuron_name = "test_neuron".to_string();
1170
1171 let ganglion_synapse_not_found = GanglionError::SynapseNotFound {
1173 neuron_name: neuron_name.clone(),
1174 ganglion_name: ganglion_name.clone(),
1175 ganglion_id,
1176 };
1177 let axon_neuron_not_adapted = AxonError::from_ganglion_error(ganglion_synapse_not_found);
1178 match axon_neuron_not_adapted {
1179 AxonError::NeuronNotAdapted {
1180 neuron_name: n,
1181 ganglion_name: g,
1182 ganglion_id: id,
1183 } => {
1184 assert_eq!(n, neuron_name);
1185 assert_eq!(g, ganglion_name);
1186 assert_eq!(id, ganglion_id);
1187 }
1188 _ => panic!("Expected NeuronNotAdapted variant"),
1189 }
1190
1191 let ganglion_synapse_lock = GanglionError::SynapseLock {
1193 neuron_name: neuron_name.clone(),
1194 ganglion_name: ganglion_name.clone(),
1195 ganglion_id,
1196 };
1197 let axon_synapse_lock = AxonError::from_ganglion_error(ganglion_synapse_lock);
1198 match axon_synapse_lock {
1199 AxonError::SynapseLock {
1200 neuron_name: n,
1201 ganglion_name: g,
1202 ganglion_id: id,
1203 } => {
1204 assert_eq!(n, neuron_name);
1205 assert_eq!(g, ganglion_name);
1206 assert_eq!(id, ganglion_id);
1207 }
1208 _ => panic!("Expected SynapseLockError variant"),
1209 }
1210
1211 let ganglion_transmit = GanglionError::Transmit {
1213 neuron_name: neuron_name.clone(),
1214 ganglion_name: ganglion_name.clone(),
1215 ganglion_id,
1216 message: "test transmit error".to_string(),
1217 };
1218 let axon_transmit = AxonError::from_ganglion_error(ganglion_transmit);
1219 match axon_transmit {
1220 AxonError::Transmit {
1221 neuron_name: n,
1222 ganglion_name: g,
1223 ganglion_id: id,
1224 message: m,
1225 } => {
1226 assert_eq!(n, neuron_name);
1227 assert_eq!(g, ganglion_name);
1228 assert_eq!(id, ganglion_id);
1229 assert_eq!(m, "test transmit error");
1230 }
1231 _ => panic!("Expected Transmit variant"),
1232 }
1233
1234 let direct_neuron_not_adapted = AxonError::NeuronNotAdapted {
1236 neuron_name: neuron_name.clone(),
1237 ganglion_name: ganglion_name.clone(),
1238 ganglion_id,
1239 };
1240 assert!(direct_neuron_not_adapted.to_string().contains(&neuron_name));
1241 assert!(
1242 direct_neuron_not_adapted
1243 .to_string()
1244 .contains(&ganglion_name)
1245 );
1246
1247 let direct_transmit = AxonError::Transmit {
1248 neuron_name: neuron_name.clone(),
1249 ganglion_name: ganglion_name.clone(),
1250 ganglion_id,
1251 message: "direct message".to_string(),
1252 };
1253 assert!(direct_transmit.to_string().contains(&neuron_name));
1254 assert!(direct_transmit.to_string().contains(&ganglion_name));
1255 assert!(direct_transmit.to_string().contains("direct message"));
1256 }
1257}