1use crate::codec::{Codec, CodecName};
14use crate::erasure::payload::erase_payload;
15use crate::erasure::reactant::ReactantErased;
16use crate::ganglion::{GanglionError, GanglionInternal};
17use crate::logging::TraceContext;
18use crate::neuron::Neuron;
19use crate::payload::Payload;
20use crate::reactant::Reactant;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::time::Duration;
25use thiserror::Error;
26use tokio::sync::Mutex;
27use tokio::time::timeout;
28use tracing::Instrument;
29use uuid::Uuid;
30
31#[derive(Error, Debug)]
33pub enum AxonError {
34 #[error(
35 "Neuron needs to be adapted to ganglion {ganglion_name} (id: {ganglion_id}): {neuron_name}"
36 )]
37 NeuronNotAdapted {
38 neuron_name: String,
39 ganglion_name: String,
40 ganglion_id: Uuid,
41 },
42 #[error(
43 "Failed to acquire lock on synapse: {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})"
44 )]
45 SynapseLock {
46 neuron_name: String,
47 ganglion_name: String,
48 ganglion_id: Uuid,
49 },
50 #[error(
51 "Transmission error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id}): {message}"
52 )]
53 Transmit {
54 neuron_name: String,
55 ganglion_name: String,
56 ganglion_id: Uuid,
57 message: String,
58 },
59 #[error("Encode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
60 Encode {
61 neuron_name: String,
62 ganglion_name: String,
63 ganglion_id: Uuid,
64 },
65 #[error("Decode error for neuron {neuron_name} (ganglion: {ganglion_name}, id: {ganglion_id})")]
66 Decode {
67 neuron_name: String,
68 ganglion_name: String,
69 ganglion_id: Uuid,
70 },
71 #[error("Transmission timeout")]
72 TransmissionTimeout,
73 #[error("Ganglion error: {0}")]
74 GanglionError(#[from] GanglionError),
75}
76
77impl AxonError {
78 pub fn from_ganglion_error(error: GanglionError) -> Self {
80 match error {
81 GanglionError::SynapseNotFound {
82 neuron_name,
83 ganglion_name,
84 ganglion_id,
85 } => AxonError::NeuronNotAdapted {
86 neuron_name,
87 ganglion_name,
88 ganglion_id,
89 },
90 GanglionError::SynapseLock {
91 neuron_name,
92 ganglion_name,
93 ganglion_id,
94 } => AxonError::SynapseLock {
95 neuron_name,
96 ganglion_name,
97 ganglion_id,
98 },
99 GanglionError::Transmit {
100 neuron_name,
101 ganglion_name,
102 ganglion_id,
103 message,
104 } => AxonError::Transmit {
105 neuron_name,
106 ganglion_name,
107 ganglion_id,
108 message,
109 },
110 GanglionError::Encode {
111 neuron_name,
112 ganglion_name,
113 ganglion_id,
114 } => AxonError::Encode {
115 neuron_name,
116 ganglion_name,
117 ganglion_id,
118 },
119 GanglionError::Decode {
120 neuron_name,
121 ganglion_name,
122 ganglion_id,
123 } => AxonError::Decode {
124 neuron_name,
125 ganglion_name,
126 ganglion_id,
127 },
128 GanglionError::Adapt { .. } | GanglionError::QueueFull { .. } => {
129 AxonError::GanglionError(error)
131 }
132 }
133 }
134}
135
136pub trait Axon<T, C>: Send + Sync
138where
139 T: Send + Sync + 'static,
140 C: Codec<T> + CodecName + Send + Sync + 'static,
141{
142 fn react(
144 &mut self,
145 reactants: Vec<Box<dyn Reactant<T, C> + Send + Sync + 'static>>,
146 ) -> Pin<Box<dyn Future<Output = Result<(), AxonError>> + Send + '_>>;
147
148 fn transmit(
150 &mut self,
151 data: T,
152 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
153
154 fn transmit_with_trace(
156 &mut self,
157 data: T,
158 trace_context: TraceContext,
159 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
160
161 fn transmit_erased(
163 &mut self,
164 data: T,
165 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
166
167 fn transmit_erased_with_trace(
169 &mut self,
170 data: T,
171 trace_context: TraceContext,
172 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>>;
173
174 fn neuron_name(&self) -> String;
176
177 fn transmit_with_timeout(
179 &mut self,
180 data: T,
181 timeout_duration: Duration,
182 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
183 Box::pin(async move {
184 match timeout(timeout_duration, self.transmit(data)).await {
185 Ok(result) => result,
186 Err(_) => Err(AxonError::TransmissionTimeout),
187 }
188 })
189 }
190
191 fn transmit_with_trace_with_timeout(
193 &mut self,
194 data: T,
195 trace_context: TraceContext,
196 timeout_duration: Duration,
197 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
198 Box::pin(async move {
199 match timeout(timeout_duration, self.transmit_with_trace(data, trace_context)).await {
200 Ok(result) => result,
201 Err(_) => Err(AxonError::TransmissionTimeout),
202 }
203 })
204 }
205
206 #[allow(clippy::type_complexity)]
208 fn transmit_batch(
209 &mut self,
210 data_items: Vec<T>,
211 ) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<()>>, AxonError>> + Send + '_>> {
212 Box::pin(async move {
213 let mut results = Vec::new();
214 for item in data_items {
215 match self.transmit(item).await {
216 Ok(result) => results.push(result),
217 Err(e) => return Err(e),
218 }
219 }
220 Ok(results)
221 })
222 }
223
224 #[allow(clippy::type_complexity)]
226 fn transmit_batch_with_trace(
227 &mut self,
228 data_items: Vec<T>,
229 trace_context: TraceContext,
230 ) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<()>>, AxonError>> + Send + '_>> {
231 Box::pin(async move {
232 let mut results = Vec::new();
233 for item in data_items {
234 match self.transmit_with_trace(item, trace_context).await {
235 Ok(result) => results.push(result),
236 Err(e) => return Err(e),
237 }
238 }
239 Ok(results)
240 })
241 }
242
243 fn is_ready(&self) -> bool {
245 !self.neuron_name().is_empty()
246 }
247
248 fn status(&self) -> String {
250 format!("Axon[neuron: {}]", self.neuron_name())
251 }
252}
253
254pub struct AxonImpl<T, C>
256where
257 T: Send + Sync + 'static,
258 C: Codec<T> + CodecName + Send + Sync + 'static,
259{
260 neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>,
262
263 ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
265}
266
267impl<T, C> Clone for AxonImpl<T, C>
268where
269 T: Send + Sync + 'static,
270 C: Codec<T> + CodecName + Send + Sync + 'static,
271{
272 fn clone(&self) -> Self {
273 Self {
274 neuron: self.neuron.clone(),
275 ganglion: self.ganglion.clone(),
276 }
277 }
278}
279
280impl<T, C> AxonImpl<T, C>
281where
282 T: Send + Sync + 'static,
283 C: Codec<T> + CodecName + Send + Sync + 'static,
284{
285 pub fn new(
287 neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>,
288 ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
289 ) -> Self {
290 Self { neuron, ganglion }
291 }
292
293 pub fn builder() -> AxonBuilder<T, C> {
295 AxonBuilder::new()
296 }
297
298 pub fn neuron(&self) -> &Arc<dyn Neuron<T, C> + Send + Sync + 'static> {
300 &self.neuron
301 }
302
303 pub fn ganglion(&self) -> &Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>> {
305 &self.ganglion
306 }
307
308 pub fn clone_neuron(&self) -> Arc<dyn Neuron<T, C> + Send + Sync + 'static> {
310 self.neuron.clone()
311 }
312
313 pub fn clone_ganglion(&self) -> Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>> {
315 self.ganglion.clone()
316 }
317
318 pub async fn ganglion_id(&self) -> Uuid {
320 self.ganglion.lock().await.unique_id()
321 }
322
323 pub fn validate(&self) -> Result<(), String> {
325 if self.neuron.name().is_empty() {
326 return Err("Neuron name is empty".to_string());
327 }
328 Ok(())
329 }
330}
331
332pub struct AxonBuilder<T, C>
334where
335 T: Send + Sync + 'static,
336 C: Codec<T> + CodecName + Send + Sync + 'static,
337{
338 neuron: Option<Arc<dyn Neuron<T, C> + Send + Sync + 'static>>,
339 ganglion: Option<Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>,
340}
341
342impl<T, C> AxonBuilder<T, C>
343where
344 T: Send + Sync + 'static,
345 C: Codec<T> + CodecName + Send + Sync + 'static,
346{
347 pub fn new() -> Self {
349 Self {
350 neuron: None,
351 ganglion: None,
352 }
353 }
354
355 pub fn with_neuron(mut self, neuron: Arc<dyn Neuron<T, C> + Send + Sync + 'static>) -> Self {
357 self.neuron = Some(neuron);
358 self
359 }
360
361 pub fn with_ganglion(
363 mut self,
364 ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
365 ) -> Self {
366 self.ganglion = Some(ganglion);
367 self
368 }
369
370 pub fn build(self) -> Result<AxonImpl<T, C>, String> {
372 let neuron = self.neuron.ok_or("Neuron is required")?;
373 let ganglion = self.ganglion.ok_or("Ganglion is required")?;
374
375 let axon = AxonImpl::new(neuron, ganglion);
376 axon.validate()?;
377 Ok(axon)
378 }
379
380 pub fn build_unchecked(self) -> Result<AxonImpl<T, C>, String> {
382 let neuron = self.neuron.ok_or("Neuron is required")?;
383 let ganglion = self.ganglion.ok_or("Ganglion is required")?;
384
385 Ok(AxonImpl::new(neuron, ganglion))
386 }
387}
388
389impl<T, C> Default for AxonBuilder<T, C>
390where
391 T: Send + Sync + 'static,
392 C: Codec<T> + CodecName + Send + Sync + 'static,
393{
394 fn default() -> Self {
395 Self::new()
396 }
397}
398
399impl<T, C> Axon<T, C> for AxonImpl<T, C>
400where
401 T: Send + Sync + 'static,
402 C: Codec<T> + CodecName + Send + Sync + 'static,
403{
404 fn react(
405 &mut self,
406 reactants: Vec<Box<dyn Reactant<T, C> + Send + Sync + 'static>>,
407 ) -> Pin<Box<dyn Future<Output = Result<(), AxonError>> + Send + '_>> {
408 let neuron_name = self.neuron.name();
409 let ganglion = self.ganglion.clone();
410
411 Box::pin(async move {
412 let erased_reactants: Vec<Arc<dyn ReactantErased + Send + Sync + 'static>> = reactants
414 .into_iter()
415 .map(|reactant| reactant.erase())
416 .collect();
417
418 let future = {
419 let mut ganglion_guard = ganglion.lock().await;
420 ganglion_guard.react(neuron_name, erased_reactants, vec![])
421 };
422 match future.await {
423 Ok(result) => Ok(result),
424 Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
425 }
426 })
427 }
428
429 fn transmit(
430 &mut self,
431 data: T,
432 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
433 let neuron = self.neuron.clone();
434 let ganglion = self.ganglion.clone();
435
436 Box::pin(async move {
437 let payload = Payload::new(data, neuron.clone());
439
440 let future = {
442 let mut ganglion_guard = ganglion.lock().await;
443 ganglion_guard.transmit(erase_payload(payload))
444 };
445 match future.await {
446 Ok(result) => Ok(result),
447 Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
448 }
449 })
450 }
451
452 fn transmit_with_trace(
453 &mut self,
454 data: T,
455 trace_context: TraceContext,
456 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
457 let neuron = self.neuron.clone();
458 let ganglion = self.ganglion.clone();
459
460 Box::pin(async move {
461 let payload = Arc::new(Payload::from_parts(
463 Arc::new(data),
464 neuron.clone(),
465 trace_context.new_child(),
466 ));
467
468 let future = {
470 let mut ganglion_guard = ganglion.lock().await;
471 ganglion_guard.transmit(erase_payload(payload))
472 };
473 match future.await {
474 Ok(result) => Ok(result),
475 Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
476 }
477 })
478 }
479
480 fn transmit_erased(
481 &mut self,
482 data: T,
483 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
484 let neuron = self.neuron.clone();
485 let ganglion = self.ganglion.clone();
486
487 Box::pin(
488 async move {
489 let payload = Payload::new(data, neuron.clone());
491 let erased_payload = erase_payload(payload);
492 let span = erased_payload.span_debug("AxonImpl::transmit_erased");
493
494 async move {
495 tracing::debug!("Starting erased transmission");
496 let future = {
497 let mut ganglion_guard = ganglion.lock().await;
498 ganglion_guard.transmit(erased_payload)
499 };
500 match future.await {
501 Ok(result) => Ok(result),
502 Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
503 }
504 }
505 .instrument(span)
506 .await
507 },
508 )
509 }
510
511 fn transmit_erased_with_trace(
512 &mut self,
513 data: T,
514 trace_context: TraceContext,
515 ) -> Pin<Box<dyn Future<Output = Result<Vec<()>, AxonError>> + Send + '_>> {
516 let neuron = self.neuron.clone();
517 let ganglion = self.ganglion.clone();
518
519 Box::pin(async move {
520 let payload = Arc::new(Payload::from_parts(
522 Arc::new(data),
523 neuron.clone(),
524 trace_context.new_child(),
525 ));
526
527 let erased_payload = erase_payload(payload);
529 let span = erased_payload.span_debug("AxonImpl::transmit_erased_with_trace");
530
531 async move {
532 tracing::debug!("Starting erased transmission with trace");
533
534 let future = {
535 let mut ganglion_guard = ganglion.lock().await;
536 ganglion_guard.transmit(erased_payload)
537 };
538 match future.await {
539 Ok(result) => Ok(result),
540 Err(ganglion_error) => Err(AxonError::from_ganglion_error(ganglion_error)),
541 }
542 }
543 .instrument(span)
544 .await
545 })
546 }
547
548 fn neuron_name(&self) -> String {
549 self.neuron.name()
550 }
551}
552
553impl<T, C> std::fmt::Debug for AxonImpl<T, C>
554where
555 T: Send + Sync + 'static,
556 C: Codec<T> + CodecName + Send + Sync + 'static,
557{
558 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
559 f.debug_struct("AxonImpl")
560 .field("neuron", &self.neuron.name())
561 .finish()
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use crate::ganglion::{Ganglion, GanglionInprocess};
569 use crate::neuron::NeuronImpl;
570 use crate::payload::Payload;
571 use crate::plexus::Plexus;
572 use crate::test_utils::{DebugCodec, DebugStruct, TokioMpscReactant, test_namespace};
573 use tokio::sync::{Mutex, mpsc::channel};
574 use uuid::Uuid;
575
576 #[tokio::test]
577 async fn test_axon_creation() {
578 let ns = test_namespace();
579 let neuron: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> =
580 Arc::new(NeuronImpl::new(ns));
581 let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
582 Arc::new(Mutex::new(GanglionInprocess::new()));
583
584 let axon = AxonImpl::new(neuron, ganglion);
585
586 assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
587 }
588
589 #[tokio::test]
590 async fn test_axon_transmit_with_plexus() {
591 let ns = test_namespace();
592 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
593 let neuron_arc = neuron_impl.clone_to_arc();
594
595 let mut plexus = Plexus::new(vec![], vec![]).await;
597
598 plexus
600 .adapt(neuron_arc.clone())
601 .await
602 .expect("Failed to adapt neuron");
603
604 let (tx1, mut rx1) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
606 let (tx2, mut rx2) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(2);
607
608 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
610 Arc::new(Mutex::new(plexus));
611 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
612
613 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> = vec![
614 Box::new(TokioMpscReactant::new(tx1)),
615 Box::new(TokioMpscReactant::new(tx2)),
616 ];
617
618 axon.react(reactants).await.expect("Failed to react");
619
620 let test_data = DebugStruct {
622 foo: 42,
623 bar: "test transmission".to_string(),
624 };
625
626 let trace_context = TraceContext::new();
628 let correlation_id = trace_context.correlation_id;
629 axon.transmit_with_trace(test_data.clone(), trace_context)
630 .await
631 .expect("Failed to transmit");
632
633 let received1 = rx1.recv().await.expect("Failed to receive payload 1");
635 let received2 = rx2.recv().await.expect("Failed to receive payload 2");
636
637 assert_eq!(received1.value.foo, 42);
638 assert_eq!(received1.value.bar, "test transmission");
639 assert_eq!(received2.value.foo, 42);
640 assert_eq!(received2.value.bar, "test transmission");
641 assert_eq!(received1.correlation_id(), correlation_id);
642 assert_eq!(received2.correlation_id(), correlation_id);
643 }
644
645 #[tokio::test]
646 async fn test_axon_transmit_erased() {
647 let ns = test_namespace();
648 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
649 let neuron_arc = neuron_impl.clone_to_arc();
650
651 let mut plexus = Plexus::new(vec![], vec![]).await;
653
654 plexus
656 .adapt(neuron_arc.clone())
657 .await
658 .expect("Failed to adapt neuron");
659
660 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
662
663 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
665 Arc::new(Mutex::new(plexus));
666 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
667
668 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
669 vec![Box::new(TokioMpscReactant::new(tx))];
670
671 axon.react(reactants).await.expect("Failed to react");
672
673 let test_data = DebugStruct {
675 foo: 99,
676 bar: "erased test".to_string(),
677 };
678
679 let trace_context = TraceContext::new();
681 let correlation_id = trace_context.correlation_id;
682 axon.transmit_erased_with_trace(test_data.clone(), trace_context)
683 .await
684 .expect("Failed to transmit erased");
685
686 let received = rx.recv().await.expect("Failed to receive payload");
688
689 assert_eq!(received.value.foo, 99);
690 assert_eq!(received.value.bar, "erased test");
691 assert_eq!(received.correlation_id(), correlation_id);
692 }
693
694 #[tokio::test]
695 async fn test_axon_transmit_without_correlation_id() {
696 let ns = test_namespace();
697 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
698 let neuron_arc = neuron_impl.clone_to_arc();
699
700 let mut plexus = Plexus::new(vec![], vec![]).await;
702
703 plexus
705 .adapt(neuron_arc.clone())
706 .await
707 .expect("Failed to adapt neuron");
708
709 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
711
712 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
714 Arc::new(Mutex::new(plexus));
715 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
716
717 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
718 vec![Box::new(TokioMpscReactant::new(tx))];
719
720 axon.react(reactants).await.expect("Failed to react");
721
722 let test_data = DebugStruct {
724 foo: 123,
725 bar: "auto correlation".to_string(),
726 };
727
728 axon.transmit(test_data.clone())
730 .await
731 .expect("Failed to transmit");
732
733 let received = rx.recv().await.expect("Failed to receive payload");
735
736 assert_eq!(received.value.foo, 123);
737 assert_eq!(received.value.bar, "auto correlation");
738 assert_ne!(received.correlation_id(), Uuid::nil());
740 }
741
742 #[tokio::test]
743 async fn test_axon_multiple_transmissions() {
744 let ns = test_namespace();
745 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
746 let neuron_arc = neuron_impl.clone_to_arc();
747
748 let mut plexus = Plexus::new(vec![], vec![]).await;
750
751 plexus
753 .adapt(neuron_arc.clone())
754 .await
755 .expect("Failed to adapt neuron");
756
757 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
759
760 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
762 Arc::new(Mutex::new(plexus));
763 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
764
765 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
766 vec![Box::new(TokioMpscReactant::new(tx))];
767
768 axon.react(reactants).await.expect("Failed to react");
769
770 for i in 0..5 {
772 let test_data = DebugStruct {
773 foo: i,
774 bar: format!("transmission {i}"),
775 };
776
777 axon.transmit(test_data)
778 .await
779 .expect("Failed to transmit");
780 }
781
782 for i in 0..5 {
784 let received = rx
785 .recv()
786 .await
787 .unwrap_or_else(|| panic!("Failed to receive payload {i}"));
788
789 assert_eq!(received.value.foo, i);
790 assert_eq!(received.value.bar, format!("transmission {i}"));
791 }
792 }
793
794 #[tokio::test]
795 async fn test_axon_transmit_with_plexus_external_ganglion() {
796 use crate::erasure::reactant::erase_reactant_raw;
797 use crate::ganglion::GanglionExternal;
798 use crate::neuron::NeuronImpl;
799 use crate::payload::PayloadRaw;
800 use crate::plexus::Plexus;
801 use crate::test_utils::{
802 DebugCodec, DebugStruct, GanglionExternalInprocess, TokioMpscReactantRaw,
803 test_namespace,
804 };
805 use tokio::sync::mpsc::channel;
806 use tokio::time::{Duration, sleep};
807
808 let ns = test_namespace();
809 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
810 let neuron_arc = neuron_impl.clone_to_arc();
811
812 let mut plexus = Plexus::new(vec![], vec![]).await;
814
815 plexus
817 .adapt(neuron_arc.clone())
818 .await
819 .expect("Failed to adapt neuron");
820
821 let (tx_raw, mut rx_raw) = channel::<Arc<PayloadRaw<DebugStruct, DebugCodec>>>(2);
823
824 let mut external_ganglion = GanglionExternalInprocess::new();
825
826 external_ganglion
828 .adapt(neuron_arc.clone())
829 .await
830 .expect("Failed to adapt neuron to external ganglion");
831
832 let raw_reactants = vec![erase_reactant_raw::<DebugStruct, DebugCodec, _>(Box::new(
834 TokioMpscReactantRaw::new(tx_raw),
835 ))];
836 external_ganglion
837 .react(neuron_arc.name(), vec![], raw_reactants, vec![])
838 .await
839 .expect("Failed to react with external ganglion");
840
841 let external_ganglion_arc = Arc::new(Mutex::new(external_ganglion));
843 let _ = plexus
844 .infuse_external_ganglion(external_ganglion_arc.clone())
845 .await;
846
847 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
849 Arc::new(Mutex::new(plexus));
850 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
851
852 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
854 vec![];
855 axon.react(reactants).await.expect("Failed to react");
856
857 let test_data = DebugStruct {
859 foo: 99,
860 bar: "external ganglion test".to_string(),
861 };
862
863 let trace_context = TraceContext::new();
865 let correlation_id = trace_context.correlation_id;
866 axon.transmit_with_trace(test_data.clone(), trace_context)
867 .await
868 .expect("Failed to transmit");
869
870 sleep(Duration::from_millis(100)).await;
872
873 assert_eq!(
875 rx_raw.len(),
876 1,
877 "External ganglion should have received exactly one payload"
878 );
879 let received_raw = rx_raw
880 .recv()
881 .await
882 .expect("Failed to receive raw payload from external ganglion");
883
884 let decoded_data = DebugCodec::decode(&received_raw.value).expect("Failed to decode");
886 assert_eq!(decoded_data.foo, 99);
887 assert_eq!(decoded_data.bar, "external ganglion test");
888 assert_eq!(received_raw.correlation_id(), correlation_id);
889 }
890
891 #[tokio::test]
892 async fn test_axon_transmit_simple() {
893 let ns = test_namespace();
894 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
895 let neuron_arc = neuron_impl.clone_to_arc();
896
897 let mut plexus = Plexus::new(vec![], vec![]).await;
898 plexus
899 .adapt(neuron_arc.clone())
900 .await
901 .expect("Failed to adapt neuron");
902
903 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
904 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
905 Arc::new(Mutex::new(plexus));
906 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
907
908 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
909 vec![Box::new(TokioMpscReactant::new(tx))];
910 axon.react(reactants).await.expect("Failed to react");
911
912 let test_data = DebugStruct {
913 foo: 555,
914 bar: "simple transmission".to_string(),
915 };
916
917 axon.transmit(test_data.clone())
919 .await
920 .expect("Failed to transmit simple");
921
922 let received = rx.recv().await.expect("Failed to receive payload");
923 assert_eq!(received.value.foo, 555);
924 assert_eq!(received.value.bar, "simple transmission");
925 assert_ne!(received.correlation_id(), Uuid::nil());
926 }
927
928 #[tokio::test]
929 async fn test_axon_transmit_with_timeout() {
930 let ns = test_namespace();
931 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
932 let neuron_arc = neuron_impl.clone_to_arc();
933
934 let mut plexus = Plexus::new(vec![], vec![]).await;
935 plexus
936 .adapt(neuron_arc.clone())
937 .await
938 .expect("Failed to adapt neuron");
939
940 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(1);
941 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
942 Arc::new(Mutex::new(plexus));
943 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
944
945 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
946 vec![Box::new(TokioMpscReactant::new(tx))];
947 axon.react(reactants).await.expect("Failed to react");
948
949 let test_data = DebugStruct {
950 foo: 777,
951 bar: "timeout test".to_string(),
952 };
953
954 let timeout_duration = Duration::from_secs(1);
956 axon.transmit_with_timeout(test_data.clone(), timeout_duration)
957 .await
958 .expect("Failed to transmit with timeout");
959
960 let received = rx.recv().await.expect("Failed to receive payload");
961 assert_eq!(received.value.foo, 777);
962 assert_eq!(received.value.bar, "timeout test");
963 }
964
965 #[tokio::test]
966 async fn test_axon_transmit_batch() {
967 let ns = test_namespace();
968 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
969 let neuron_arc = neuron_impl.clone_to_arc();
970
971 let mut plexus = Plexus::new(vec![], vec![]).await;
972 plexus
973 .adapt(neuron_arc.clone())
974 .await
975 .expect("Failed to adapt neuron");
976
977 let (tx, mut rx) = channel::<Arc<Payload<DebugStruct, DebugCodec>>>(10);
978 let plexus_ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
979 Arc::new(Mutex::new(plexus));
980 let mut axon = AxonImpl::new(neuron_arc.clone(), plexus_ganglion);
981
982 let reactants: Vec<Box<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync + 'static>> =
983 vec![Box::new(TokioMpscReactant::new(tx))];
984 axon.react(reactants).await.expect("Failed to react");
985
986 let batch_data = vec![
988 DebugStruct {
989 foo: 1,
990 bar: "batch item 1".to_string(),
991 },
992 DebugStruct {
993 foo: 2,
994 bar: "batch item 2".to_string(),
995 },
996 DebugStruct {
997 foo: 3,
998 bar: "batch item 3".to_string(),
999 },
1000 ];
1001
1002 let results = axon
1004 .transmit_batch(batch_data.clone())
1005 .await
1006 .expect("Failed to transmit batch");
1007 assert_eq!(results.len(), 3);
1008
1009 for i in 0..3 {
1011 let received = rx
1012 .recv()
1013 .await
1014 .unwrap_or_else(|| panic!("Failed to receive batch item {i}"));
1015 assert_eq!(received.value.foo, (i + 1));
1016 assert_eq!(received.value.bar, format!("batch item {}", i + 1));
1017 assert_ne!(received.correlation_id(), Uuid::nil());
1021 }
1022 }
1023
1024 #[tokio::test]
1025 async fn test_axon_status_and_validation() {
1026 let ns = test_namespace();
1027 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1028 let neuron_arc = neuron_impl.clone_to_arc();
1029
1030 let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1031 Arc::new(Mutex::new(GanglionInprocess::new()));
1032 let axon = AxonImpl::new(neuron_arc, ganglion);
1033
1034 assert!(axon.is_ready());
1036
1037 let status = axon.status();
1039 assert!(status.contains("dev.plexo.DebugStruct.debug"));
1040
1041 assert!(axon.validate().is_ok());
1043
1044 assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
1046 }
1047
1048 #[tokio::test]
1049 async fn test_axon_builder() {
1050 let ns = test_namespace();
1051 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1052 let neuron_arc = neuron_impl.clone_to_arc();
1053
1054 let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1055 Arc::new(Mutex::new(GanglionInprocess::new()));
1056
1057 let axon = AxonImpl::builder()
1059 .with_neuron(neuron_arc.clone())
1060 .with_ganglion(ganglion.clone())
1061 .build()
1062 .expect("Failed to build axon");
1063
1064 assert_eq!(axon.neuron_name(), "dev.plexo.DebugStruct.debug");
1065 assert!(axon.is_ready());
1066
1067 let result = AxonImpl::<DebugStruct, DebugCodec>::builder()
1069 .with_neuron(neuron_arc.clone())
1070 .build();
1071 assert!(result.is_err());
1072 assert!(result.unwrap_err().contains("Ganglion is required"));
1073
1074 let axon_unchecked = AxonImpl::builder()
1076 .with_neuron(neuron_arc.clone())
1077 .with_ganglion(ganglion.clone())
1078 .build_unchecked()
1079 .expect("Failed to build unchecked axon");
1080
1081 assert_eq!(axon_unchecked.neuron_name(), "dev.plexo.DebugStruct.debug");
1082 }
1083
1084 #[tokio::test]
1085 async fn test_axon_additional_methods() {
1086 let ns = test_namespace();
1087 let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns);
1088 let neuron_arc = neuron_impl.clone_to_arc();
1089
1090 let ganglion: Arc<Mutex<dyn GanglionInternal + Send + Sync>> =
1091 Arc::new(Mutex::new(GanglionInprocess::new()));
1092 let axon = AxonImpl::new(neuron_arc.clone(), ganglion.clone());
1093
1094 let cloned_neuron = axon.clone_neuron();
1096 assert_eq!(cloned_neuron.name(), neuron_arc.name());
1097
1098 let ganglion_id = axon.ganglion_id().await;
1100 assert_ne!(ganglion_id, Uuid::nil());
1101
1102 assert_eq!(axon.neuron().name(), "dev.plexo.DebugStruct.debug");
1104 }
1105
1106 #[test]
1107 fn test_axon_error_variants_and_conversion() {
1108 let ganglion_id = Uuid::now_v7();
1109 let ganglion_name = "TestGanglion".to_string();
1110 let neuron_name = "test_neuron".to_string();
1111
1112 let ganglion_synapse_not_found = GanglionError::SynapseNotFound {
1114 neuron_name: neuron_name.clone(),
1115 ganglion_name: ganglion_name.clone(),
1116 ganglion_id,
1117 };
1118 let axon_neuron_not_adapted = AxonError::from_ganglion_error(ganglion_synapse_not_found);
1119 match axon_neuron_not_adapted {
1120 AxonError::NeuronNotAdapted {
1121 neuron_name: n,
1122 ganglion_name: g,
1123 ganglion_id: id,
1124 } => {
1125 assert_eq!(n, neuron_name);
1126 assert_eq!(g, ganglion_name);
1127 assert_eq!(id, ganglion_id);
1128 }
1129 _ => panic!("Expected NeuronNotAdapted variant"),
1130 }
1131
1132 let ganglion_synapse_lock = GanglionError::SynapseLock {
1134 neuron_name: neuron_name.clone(),
1135 ganglion_name: ganglion_name.clone(),
1136 ganglion_id,
1137 };
1138 let axon_synapse_lock = AxonError::from_ganglion_error(ganglion_synapse_lock);
1139 match axon_synapse_lock {
1140 AxonError::SynapseLock {
1141 neuron_name: n,
1142 ganglion_name: g,
1143 ganglion_id: id,
1144 } => {
1145 assert_eq!(n, neuron_name);
1146 assert_eq!(g, ganglion_name);
1147 assert_eq!(id, ganglion_id);
1148 }
1149 _ => panic!("Expected SynapseLockError variant"),
1150 }
1151
1152 let ganglion_transmit = GanglionError::Transmit {
1154 neuron_name: neuron_name.clone(),
1155 ganglion_name: ganglion_name.clone(),
1156 ganglion_id,
1157 message: "test transmit error".to_string(),
1158 };
1159 let axon_transmit = AxonError::from_ganglion_error(ganglion_transmit);
1160 match axon_transmit {
1161 AxonError::Transmit {
1162 neuron_name: n,
1163 ganglion_name: g,
1164 ganglion_id: id,
1165 message: m,
1166 } => {
1167 assert_eq!(n, neuron_name);
1168 assert_eq!(g, ganglion_name);
1169 assert_eq!(id, ganglion_id);
1170 assert_eq!(m, "test transmit error");
1171 }
1172 _ => panic!("Expected Transmit variant"),
1173 }
1174
1175 let direct_neuron_not_adapted = AxonError::NeuronNotAdapted {
1177 neuron_name: neuron_name.clone(),
1178 ganglion_name: ganglion_name.clone(),
1179 ganglion_id,
1180 };
1181 assert!(direct_neuron_not_adapted.to_string().contains(&neuron_name));
1182 assert!(
1183 direct_neuron_not_adapted
1184 .to_string()
1185 .contains(&ganglion_name)
1186 );
1187
1188 let direct_transmit = AxonError::Transmit {
1189 neuron_name: neuron_name.clone(),
1190 ganglion_name: ganglion_name.clone(),
1191 ganglion_id,
1192 message: "direct message".to_string(),
1193 };
1194 assert!(direct_transmit.to_string().contains(&neuron_name));
1195 assert!(direct_transmit.to_string().contains(&ganglion_name));
1196 assert!(direct_transmit.to_string().contains("direct message"));
1197 }
1198}