1use crate::codec::{Codec, CodecName};
8use crate::erasure::payload::{erase_payload, erase_payload_raw};
9use crate::erasure::reactant::{
10 ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
11};
12use crate::ganglion::{GanglionExternal, GanglionInternal};
13use crate::logging::LogTrace;
14use crate::payload::{Payload, PayloadRaw};
15use crate::reactant::{Reactant, ReactantRaw};
16use futures_util::future::join_all;
17use moka::future::Cache;
18use std::collections::{HashMap, HashSet};
19use std::future::Future;
20use std::marker::PhantomData;
21use std::pin::Pin;
22use std::sync::Arc;
23use tokio::sync::{Mutex, RwLock};
24use tracing::{Instrument, debug};
25use uuid::Uuid;
26
27#[allow(clippy::type_complexity)]
28pub struct PlexusInternalReactant<T, C>
29where
30 C: Codec<T> + CodecName + Send + Sync + 'static,
31 T: Send + Sync + 'static,
32{
33 current_ganglion_id: Uuid,
34 internal_ganglia:
35 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
36 external_ganglia:
37 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
38 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
39 _phantom: PhantomData<(T, C)>,
40}
41
42impl<T, C> Clone for PlexusInternalReactant<T, C>
43where
44 C: Codec<T> + CodecName + Send + Sync + 'static,
45 T: Send + Sync + 'static,
46{
47 fn clone(&self) -> Self {
48 Self {
49 current_ganglion_id: self.current_ganglion_id,
50 internal_ganglia: self.internal_ganglia.clone(),
51 external_ganglia: self.external_ganglia.clone(),
52 reactions: self.reactions.clone(),
53 _phantom: self._phantom,
54 }
55 }
56}
57
58impl<T, C> PlexusInternalReactant<T, C>
59where
60 C: Codec<T> + CodecName + Send + Sync + 'static,
61 T: Send + Sync + 'static,
62{
63 #[allow(clippy::type_complexity)]
64 pub fn new(
65 current_ganglion_id: Uuid,
66 internal_ganglia: Arc<
67 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
68 >,
69 external_ganglia: Arc<
70 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
71 >,
72 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
73 ) -> Self {
74 Self {
75 current_ganglion_id,
76 internal_ganglia,
77 external_ganglia,
78 reactions,
79 _phantom: PhantomData,
80 }
81 }
82}
83
84impl<T, C> Reactant<T, C> for PlexusInternalReactant<T, C>
85where
86 C: Codec<T> + CodecName + Send + Sync + 'static,
87 T: Send + Sync + 'static,
88{
89 fn react(
90 &self,
91 payload: Arc<Payload<T, C>>,
92 ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
93 let current_ganglion_id = self.current_ganglion_id;
94 let internal_ganglia = self.internal_ganglia.clone();
95 let external_ganglia = self.external_ganglia.clone();
96 let reactions = self.reactions.clone();
97
98 let payload_clone = payload.clone();
99 Box::pin(
100 async move {
101 tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
102
103 let reaction_set_arc = reactions
106 .get_with(payload_clone.span_id(), async {
107 Arc::new(Mutex::new(HashSet::new()))
108 })
109 .await;
110
111 let reaction_set_copy = {
112 let mut set = reaction_set_arc.lock().await;
113 if set.contains(¤t_ganglion_id) {
114 tracing::debug!(
115 "Ganglion {current_ganglion_id} already processed reaction, returning early"
116 );
117 return Ok(());
118 }
119 set.insert(current_ganglion_id);
120 set.clone()
121 };
122
123 let erased_payload = erase_payload(payload_clone.clone());
125
126 type UnifiedTransmitFuture = Pin<
127 Box<
128 dyn Future<Output = Result<Vec<()>, crate::ganglion::GanglionError>> + Send,
129 >,
130 >;
131
132 tracing::debug!(
134 ganglion_id = %current_ganglion_id,
135 "PlexusInternalReactant::react - Acquiring read lock on internal_ganglia"
136 );
137 let internal_ganglia_guard = internal_ganglia.read().await;
138 tracing::debug!(
139 ganglion_id = %current_ganglion_id,
140 "PlexusInternalReactant::react - Acquired read lock on internal_ganglia"
141 );
142 let internal_ganglia_to_process: Vec<_> = internal_ganglia_guard
143 .iter()
144 .filter(|(ganglion_id, _ganglion)| {
145 *ganglion_id != ¤t_ganglion_id
146 && !reaction_set_copy.contains(ganglion_id)
147 })
148 .map(|(_ganglion_id, ganglion)| ganglion.clone())
149 .collect();
150 drop(internal_ganglia_guard);
151 tracing::debug!(
152 ganglion_id = %current_ganglion_id,
153 "PlexusInternalReactant::react - Released read lock on internal_ganglia"
154 );
155
156 tracing::debug!(
157 ganglion_id = %current_ganglion_id,
158 "PlexusInternalReactant::react - Acquiring read lock on external_ganglia"
159 );
160 let external_ganglia_guard = external_ganglia.read().await;
161 tracing::debug!(
162 ganglion_id = %current_ganglion_id,
163 "PlexusInternalReactant::react - Acquired read lock on external_ganglia"
164 );
165 let external_ganglia_to_process: Vec<_> = external_ganglia_guard
166 .iter()
167 .filter(|(ganglion_id, _ganglion)| !reaction_set_copy.contains(ganglion_id))
168 .map(|(_ganglion_id, ganglion)| ganglion.clone())
169 .collect();
170 drop(external_ganglia_guard);
171 tracing::debug!(
172 ganglion_id = %current_ganglion_id,
173 "PlexusInternalReactant::react - Released read lock on external_ganglia"
174 );
175
176 let internal_futures =
178 internal_ganglia_to_process
179 .into_iter()
180 .map(|ganglion_mutex| {
181 let payload = erased_payload.clone();
182 let current_id = current_ganglion_id;
183 Box::pin(async move {
184 tracing::debug!(ganglion_id = %current_id, "PlexusInternalReactant::react - Acquiring lock on internal ganglion mutex");
185 let future = {
186 let mut ganglion = ganglion_mutex.lock().await;
187 tracing::debug!(ganglion_id = %current_id, "PlexusInternalReactant::react - Acquired lock on internal ganglion mutex");
188 ganglion.transmit(payload.clone())
189 };
190 tracing::debug!("PlexusInternalReactant::react - Released lock on internal ganglion mutex, awaiting future");
191 future.await
192 }) as UnifiedTransmitFuture
193 });
194
195 let external_futures =
196 external_ganglia_to_process
197 .into_iter()
198 .map(|ganglion_mutex| {
199 let payload = erased_payload.clone();
200 let current_id = current_ganglion_id;
201 Box::pin(async move {
202 tracing::debug!(ganglion_id = %current_id, "PlexusInternalReactant::react - Acquiring lock on external ganglion mutex");
203 let future = {
204 let mut ganglion = ganglion_mutex.lock().await;
205 tracing::debug!(ganglion_id = %current_id, "PlexusInternalReactant::react - Acquired lock on external ganglion mutex");
206 ganglion.transmit(payload.clone())
207 };
208 tracing::debug!(ganglion_id = %current_id, "PlexusInternalReactant::react - Released lock on external ganglion mutex, awaiting future");
209 future.await
210 }) as UnifiedTransmitFuture
211 });
212
213 let all_futures: Vec<_> = internal_futures.chain(external_futures).collect();
214 tracing::debug!(
215 "PlexusInternalReactant::react - Joining {} futures",
216 all_futures.len()
217 );
218 let _results = join_all(all_futures).await;
219 tracing::debug!("PlexusInternalReactant::react - Completed joining futures");
220 Ok(())
221 }
222 .instrument(payload.span_debug("PlexusInternalReactant::react")),
223 )
224 }
225
226 fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
227 self.new_erased()
228 }
229}
230
231#[allow(clippy::type_complexity)]
233pub struct PlexusExternalInternalReactant<T, C>
234where
235 C: Codec<T> + CodecName + Send + Sync + 'static,
236 T: Send + Sync + 'static,
237{
238 current_ganglion_id: Uuid,
239 internal_ganglia:
240 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
241 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
242 _phantom: PhantomData<(T, C)>,
243}
244
245impl<T, C> Clone for PlexusExternalInternalReactant<T, C>
246where
247 C: Codec<T> + CodecName + Send + Sync + 'static,
248 T: Send + Sync + 'static,
249{
250 fn clone(&self) -> Self {
251 Self {
252 current_ganglion_id: self.current_ganglion_id,
253 internal_ganglia: self.internal_ganglia.clone(),
254 reactions: self.reactions.clone(),
255 _phantom: self._phantom,
256 }
257 }
258}
259
260impl<T, C> PlexusExternalInternalReactant<T, C>
261where
262 C: Codec<T> + CodecName + Send + Sync + 'static,
263 T: Send + Sync + 'static,
264{
265 #[allow(clippy::type_complexity)]
266 pub fn new(
267 current_ganglion_id: Uuid,
268 internal_ganglia: Arc<
269 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
270 >,
271 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
272 ) -> Self {
273 Self {
274 current_ganglion_id,
275 internal_ganglia,
276 reactions,
277 _phantom: PhantomData,
278 }
279 }
280}
281
282impl<T, C> Reactant<T, C> for PlexusExternalInternalReactant<T, C>
283where
284 C: Codec<T> + CodecName + Send + Sync + 'static,
285 T: Send + Sync + 'static,
286{
287 fn react(
288 &self,
289 payload: Arc<Payload<T, C>>,
290 ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
291 let current_ganglion_id = self.current_ganglion_id;
292 let internal_ganglia = self.internal_ganglia.clone();
293 let reactions = self.reactions.clone();
294
295 let payload_clone = payload.clone();
296 Box::pin(
297 async move {
298 tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
299
300 let reaction_set_arc = reactions
303 .get_with(payload_clone.span_id(), async {
304 Arc::new(Mutex::new(HashSet::new()))
305 })
306 .await;
307
308 let reaction_set_copy = {
309 let mut set = reaction_set_arc.lock().await;
310 if set.contains(¤t_ganglion_id) {
311 tracing::debug!(
312 "Ganglion {current_ganglion_id} already processed reaction, returning early"
313 );
314 return Ok(());
315 }
316 set.insert(current_ganglion_id);
317 set.clone()
318 };
319
320 let erased_payload = erase_payload(payload_clone.clone());
322
323 debug!(
325 current_ganglion_id = ?current_ganglion_id,
326 "PlexusExternalInternalReactant::react - Acquiring read lock on internal_ganglia"
327 );
328 let internal_ganglia_guard = internal_ganglia.read().await;
329 debug!(
330 current_ganglion_id = ?current_ganglion_id,
331 "PlexusExternalInternalReactant::react - Acquired read lock on internal_ganglia"
332 );
333 let internal_ganglia_to_process: Vec<_> = internal_ganglia_guard
334 .iter()
335 .filter(|(ganglion_id, _ganglion)| {
336 !reaction_set_copy.contains(ganglion_id)
337 })
338 .map(|(_ganglion_id, ganglion)| ganglion.clone())
339 .collect();
340 drop(internal_ganglia_guard);
341 debug!(
342 current_ganglion_id = ?current_ganglion_id,
343 "PlexusExternalInternalReactant::react - Released read lock on internal_ganglia"
344 );
345
346 let all_futures_len = internal_ganglia_to_process.len();
347 let internal_futures = internal_ganglia_to_process.into_iter().map(
349 |ganglion_mutex| {
350 let payload = erased_payload.clone();
351 let current_id = current_ganglion_id;
352 Box::pin(async move {
353 debug!(current_ganglion_id = ?current_id, "PlexusExternalInternalReactant::react - Acquiring lock on internal ganglion mutex");
354 let future = {
355 let mut ganglion = ganglion_mutex.lock().await;
356 debug!(current_ganglion_id = ?current_id, "PlexusExternalInternalReactant::react - Acquired lock on internal ganglion mutex");
357 ganglion.transmit(payload.clone())
358 };
359 debug!(current_ganglion_id = ?current_id, "PlexusExternalInternalReactant::react - Released lock on internal ganglion mutex, awaiting future");
360 future.await
361 })
362 },
363 );
364
365 tracing::debug!(
366 "PlexusExternalInternalReactant::react - Joining {} futures",
367 all_futures_len
368 );
369 let _results = join_all(internal_futures).await;
370 tracing::debug!(
371 "PlexusExternalInternalReactant::react - Completed joining futures"
372 );
373 Ok(())
374 }
375 .instrument(payload.span_debug("PlexusExternalInternalReactant::react")),
376 )
377 }
378
379 fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
380 self.new_erased()
381 }
382}
383
384#[allow(clippy::type_complexity)]
386pub struct PlexusExternalExternalReactant<T, C>
387where
388 C: Codec<T> + CodecName + Send + Sync + 'static,
389 T: Send + Sync + 'static,
390{
391 current_ganglion_id: Uuid,
392 external_ganglia:
393 Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
394 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
395 _phantom: PhantomData<(T, C)>,
396}
397
398impl<T, C> Clone for PlexusExternalExternalReactant<T, C>
399where
400 C: Codec<T> + CodecName + Send + Sync + 'static,
401 T: Send + Sync + 'static,
402{
403 fn clone(&self) -> Self {
404 Self {
405 current_ganglion_id: self.current_ganglion_id,
406 external_ganglia: self.external_ganglia.clone(),
407 reactions: self.reactions.clone(),
408 _phantom: self._phantom,
409 }
410 }
411}
412
413impl<T, C> PlexusExternalExternalReactant<T, C>
414where
415 C: Codec<T> + CodecName + Send + Sync + 'static,
416 T: Send + Sync + 'static,
417{
418 #[allow(clippy::type_complexity)]
419 pub fn new(
420 current_ganglion_id: Uuid,
421 external_ganglia: Arc<
422 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
423 >,
424 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
425 ) -> Self {
426 Self {
427 current_ganglion_id,
428 external_ganglia,
429 reactions,
430 _phantom: PhantomData,
431 }
432 }
433}
434
435impl<T, C> ReactantRaw<T, C> for PlexusExternalExternalReactant<T, C>
436where
437 C: Codec<T> + CodecName + Send + Sync + 'static,
438 T: Send + Sync + 'static,
439{
440 fn react(
441 &self,
442 payload: Arc<PayloadRaw<T, C>>,
443 ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
444 let current_ganglion_id = self.current_ganglion_id;
445 let external_ganglia = self.external_ganglia.clone();
446 let reactions = self.reactions.clone();
447
448 let payload_clone = payload.clone();
449 Box::pin(
450 async move {
451 tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
452
453 let reaction_set_arc = reactions
456 .get_with(payload_clone.span_id(), async {
457 Arc::new(Mutex::new(HashSet::new()))
458 })
459 .await;
460
461 let reaction_set_copy = {
462 let mut set = reaction_set_arc.lock().await;
463 if set.contains(¤t_ganglion_id) {
464 tracing::debug!(
465 "PlexusExternalExternalReactant::react - Ganglion {current_ganglion_id} already processed reaction, returning early"
466 );
467 return Ok(());
468 }
469 set.insert(current_ganglion_id);
470 set.clone()
471 };
472
473 let erased_payload = erase_payload_raw(payload_clone.clone());
475
476 debug!(
478 current_ganglion_id = ?current_ganglion_id,
479 "PlexusExternalExternalReactant::react - Acquiring read lock on external_ganglia"
480 );
481 let external_ganglia_guard = external_ganglia.read().await;
482 debug!(
483 current_ganglion_id = ?current_ganglion_id,
484 "PlexusExternalExternalReactant::react - Acquired read lock on external_ganglia"
485 );
486 let external_ganglia_to_process: Vec<_> = external_ganglia_guard
487 .iter()
488 .filter(|(ganglion_id, _ganglion)| {
489 *ganglion_id != ¤t_ganglion_id
490 && !reaction_set_copy.contains(ganglion_id)
491 })
492 .map(|(_ganglion_id, ganglion)| ganglion.clone())
493 .collect();
494 drop(external_ganglia_guard);
495 debug!(
496 current_ganglion_id = ?current_ganglion_id,
497 "PlexusExternalExternalReactant::react - Released read lock on external_ganglia"
498 );
499
500 let external_futures_len = external_ganglia_to_process.len();
502 let external_futures =
503 external_ganglia_to_process
504 .into_iter()
505 .map(|ganglion_mutex| {
506 let payload = erased_payload.clone();
507 let current_id = current_ganglion_id;
508 Box::pin(async move {
509 debug!(current_ganglion_id = ?current_id, "PlexusExternalExternalReactant::react - Acquiring lock on external ganglion mutex");
510 let future = {
511 let mut ganglion = ganglion_mutex.lock().await;
512 debug!(current_ganglion_id = ?current_id, "PlexusExternalExternalReactant::react - Acquired lock on external ganglion mutex");
513 ganglion.transmit_encoded(payload.clone())
514 };
515 debug!(current_ganglion_id = ?current_id, "PlexusExternalExternalReactant::react - Released lock on external ganglion mutex, awaiting future");
516 future.await
517 })
518 });
519
520 tracing::debug!(
521 "PlexusExternalExternalReactant::react - Joining {} futures",
522 external_futures_len
523 );
524 let _results = join_all(external_futures).await;
525 tracing::debug!(
526 "PlexusExternalExternalReactant::react - Completed joining futures"
527 );
528 Ok(())
529 }
530 .instrument(payload.span_debug("PlexusExternalExternalReactant::react")),
531 )
532 }
533
534 fn erase_raw(self: Box<Self>) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
535 self.new_erased_raw()
536 }
537}
538
539pub trait ErasedInternalReactantFactory: Send + Sync + 'static {
540 #[allow(clippy::type_complexity)]
541 fn create_reactant(
542 &self,
543 current_ganglion_id: Uuid,
544 internal_ganglia: Arc<
545 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
546 >,
547 external_ganglia: Arc<
548 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
549 >,
550 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
551 ) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
552}
553
554pub trait ErasedExternalInternalReactantFactory: Send + Sync + 'static {
555 #[allow(clippy::type_complexity)]
556 fn create_reactant(
557 &self,
558 current_ganglion_id: Uuid,
559 internal_ganglia: Arc<
560 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
561 >,
562 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
563 ) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
564}
565
566pub trait ErasedExternalExternalReactantFactory: Send + Sync + 'static {
567 #[allow(clippy::type_complexity)]
568 fn create_reactant(
569 &self,
570 current_ganglion_id: Uuid,
571 external_ganglia: Arc<
572 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
573 >,
574 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
575 ) -> Arc<dyn ReactantRawErased + Send + Sync + 'static>;
576}
577
578pub struct PlexusInternalReactantFactory<T, C> {
579 _phantom: PhantomData<(T, C)>,
580}
581
582impl<T, C> PlexusInternalReactantFactory<T, C> {
583 pub fn new() -> Self {
584 Self {
585 _phantom: PhantomData,
586 }
587 }
588}
589
590impl<T, C> Default for PlexusInternalReactantFactory<T, C> {
591 fn default() -> Self {
592 Self::new()
593 }
594}
595
596impl<T, C> ErasedInternalReactantFactory for PlexusInternalReactantFactory<T, C>
597where
598 C: Codec<T> + CodecName + Send + Sync + 'static,
599 T: Send + Sync + 'static,
600{
601 fn create_reactant(
602 &self,
603 current_ganglion_id: Uuid,
604 internal_ganglia: Arc<
605 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
606 >,
607 external_ganglia: Arc<
608 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
609 >,
610 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
611 ) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
612 erase_reactant(Box::new(PlexusInternalReactant::<T, C>::new(
613 current_ganglion_id,
614 internal_ganglia,
615 external_ganglia,
616 reactions,
617 )))
618 }
619}
620
621pub struct PlexusExternalInternalReactantFactory<T, C> {
622 _phantom: PhantomData<(T, C)>,
623}
624
625impl<T, C> PlexusExternalInternalReactantFactory<T, C> {
626 pub fn new() -> Self {
627 Self {
628 _phantom: PhantomData,
629 }
630 }
631}
632
633impl<T, C> Default for PlexusExternalInternalReactantFactory<T, C> {
634 fn default() -> Self {
635 Self::new()
636 }
637}
638
639impl<T, C> ErasedExternalInternalReactantFactory for PlexusExternalInternalReactantFactory<T, C>
640where
641 C: Codec<T> + CodecName + Send + Sync + 'static,
642 T: Send + Sync + 'static,
643{
644 fn create_reactant(
645 &self,
646 current_ganglion_id: Uuid,
647 internal_ganglia: Arc<
648 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
649 >,
650 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
651 ) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
652 erase_reactant(Box::new(PlexusExternalInternalReactant::<T, C>::new(
653 current_ganglion_id,
654 internal_ganglia,
655 reactions,
656 )))
657 }
658}
659
660pub struct PlexusExternalExternalReactantFactory<T, C> {
661 _phantom: PhantomData<(T, C)>,
662}
663
664impl<T, C> PlexusExternalExternalReactantFactory<T, C> {
665 pub fn new() -> Self {
666 Self {
667 _phantom: PhantomData,
668 }
669 }
670}
671
672impl<T, C> Default for PlexusExternalExternalReactantFactory<T, C> {
673 fn default() -> Self {
674 Self::new()
675 }
676}
677
678impl<T, C> ErasedExternalExternalReactantFactory for PlexusExternalExternalReactantFactory<T, C>
679where
680 C: Codec<T> + CodecName + Send + Sync + 'static,
681 T: Send + Sync + 'static,
682{
683 fn create_reactant(
684 &self,
685 current_ganglion_id: Uuid,
686 external_ganglia: Arc<
687 RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
688 >,
689 reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
690 ) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
691 erase_reactant_raw(Box::new(PlexusExternalExternalReactant::<T, C>::new(
692 current_ganglion_id,
693 external_ganglia,
694 reactions,
695 )))
696 }
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702 use crate::test_utils::{DebugCodec, DebugStruct};
703 use std::time::Duration;
704 use uuid::Uuid;
705
706 #[tokio::test]
707 async fn test_plexus_internal_reactant_factory() {
708 let factory = PlexusInternalReactantFactory::<DebugStruct, DebugCodec>::new();
709
710 let ganglion_id = Uuid::now_v7();
711 let internal_ganglia = Arc::new(RwLock::new(HashMap::<
712 Uuid,
713 Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
714 >::new()));
715 let external_ganglia = Arc::new(RwLock::new(HashMap::<
716 Uuid,
717 Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>::new()));
718 let reactions = Cache::builder()
719 .time_to_live(Duration::from_secs(60))
720 .build();
721
722 let reactant =
723 factory.create_reactant(ganglion_id, internal_ganglia, external_ganglia, reactions);
724
725 assert!(Arc::strong_count(&reactant) >= 1);
727 }
728
729 #[tokio::test]
730 async fn test_plexus_external_internal_reactant_factory() {
731 let factory = PlexusExternalInternalReactantFactory::<DebugStruct, DebugCodec>::new();
732
733 let ganglion_id = Uuid::now_v7();
734 let internal_ganglia = Arc::new(RwLock::new(HashMap::<
735 Uuid,
736 Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
737 >::new()));
738 let reactions = Cache::builder()
739 .time_to_live(Duration::from_secs(60))
740 .build();
741
742 let reactant = factory.create_reactant(ganglion_id, internal_ganglia, reactions);
743
744 assert!(Arc::strong_count(&reactant) >= 1);
746 }
747
748 #[tokio::test]
749 async fn test_plexus_external_external_reactant_factory() {
750 let factory = PlexusExternalExternalReactantFactory::<DebugStruct, DebugCodec>::new();
751
752 let ganglion_id = Uuid::now_v7();
753 let external_ganglia = Arc::new(RwLock::new(HashMap::<
754 Uuid,
755 Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>::new()));
756 let reactions = Cache::builder()
757 .time_to_live(Duration::from_secs(60))
758 .build();
759
760 let reactant = factory.create_reactant(ganglion_id, external_ganglia, reactions);
761
762 assert!(Arc::strong_count(&reactant) >= 1);
764 }
765}