Skip to main content

plexor_core/plexus/
reactants.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::codec::{Codec, CodecName};
8use crate::erasure::payload::{erase_payload, erase_payload_raw};
9use crate::erasure::reactant::{
10    ReactantErased, ReactantRawErased, erase_reactant, erase_reactant_raw,
11};
12use crate::ganglion::{GanglionExternal, GanglionInternal};
13use crate::logging::LogTrace;
14use crate::payload::{Payload, PayloadRaw};
15use crate::reactant::{Reactant, ReactantRaw};
16use futures_util::future::join_all;
17use moka::future::Cache;
18use std::collections::{HashMap, HashSet};
19use std::future::Future;
20use std::marker::PhantomData;
21use std::pin::Pin;
22use std::sync::Arc;
23use tokio::sync::{Mutex, RwLock};
24use tracing::{Instrument, debug};
25use uuid::Uuid;
26
27#[allow(clippy::type_complexity)]
28pub struct PlexusInternalReactant<T, C>
29where
30    C: Codec<T> + CodecName + Send + Sync + 'static,
31    T: Send + Sync + 'static,
32{
33    current_ganglion_id: Uuid,
34    internal_ganglia:
35        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
36    external_ganglia:
37        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
38    reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
39    _phantom: PhantomData<(T, C)>,
40}
41
42impl<T, C> Clone for PlexusInternalReactant<T, C>
43where
44    C: Codec<T> + CodecName + Send + Sync + 'static,
45    T: Send + Sync + 'static,
46{
47    fn clone(&self) -> Self {
48        Self {
49            current_ganglion_id: self.current_ganglion_id,
50            internal_ganglia: self.internal_ganglia.clone(),
51            external_ganglia: self.external_ganglia.clone(),
52            reactions: self.reactions.clone(),
53            _phantom: self._phantom,
54        }
55    }
56}
57
58impl<T, C> PlexusInternalReactant<T, C>
59where
60    C: Codec<T> + CodecName + Send + Sync + 'static,
61    T: Send + Sync + 'static,
62{
63    #[allow(clippy::type_complexity)]
64    pub fn new(
65        current_ganglion_id: Uuid,
66        internal_ganglia: Arc<
67            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
68        >,
69        external_ganglia: Arc<
70            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
71        >,
72        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
73    ) -> Self {
74        Self {
75            current_ganglion_id,
76            internal_ganglia,
77            external_ganglia,
78            reactions,
79            _phantom: PhantomData,
80        }
81    }
82}
83
84impl<T, C> Reactant<T, C> for PlexusInternalReactant<T, C>
85where
86    C: Codec<T> + CodecName + Send + Sync + 'static,
87    T: Send + Sync + 'static,
88{
89    fn react(
90        &self,
91        payload: Arc<Payload<T, C>>,
92    ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
93        let current_ganglion_id = self.current_ganglion_id;
94        let internal_ganglia = self.internal_ganglia.clone();
95        let external_ganglia = self.external_ganglia.clone();
96        let reactions = self.reactions.clone();
97
98        let payload_clone = payload.clone();
99        Box::pin(
100            async move {
101                tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
102
103                // Use Cache to track reactions. We key by span_id which remains stable within the Plexus.
104                // Use Cache to track reactions. We key by span_id which remains stable within the Plexus.
105                let reaction_set_arc = reactions
106                    .get_with(payload_clone.span_id(), async {
107                        Arc::new(Mutex::new(HashSet::new()))
108                    })
109                    .await;
110
111                let reaction_set_copy = {
112                    let mut set = reaction_set_arc.lock().await;
113                    if set.contains(&current_ganglion_id) {
114                        tracing::debug!(
115                        "Ganglion {current_ganglion_id} already processed reaction, returning early"
116                    );
117                        return Ok(());
118                    }
119                    set.insert(current_ganglion_id);
120                    set.clone()
121                };
122
123                // Reuse the original payload for transmission, preserving span_id and parent_id.
124                let erased_payload = erase_payload(payload_clone.clone());
125
126                type UnifiedTransmitFuture = Pin<
127                    Box<
128                        dyn Future<Output = Result<Vec<()>, crate::ganglion::GanglionError>> + Send,
129                    >,
130                >;
131
132                // Collect ganglion references first, then release locks to avoid circular dependencies
133                tracing::debug!(
134                ganglion_id = %current_ganglion_id,
135                "PlexusInternalReactant::react - Acquiring read lock on internal_ganglia"
136            );
137                let internal_ganglia_guard = internal_ganglia.read().await;
138                tracing::debug!(
139                ganglion_id = %current_ganglion_id,
140                "PlexusInternalReactant::react - Acquired read lock on internal_ganglia"
141            );
142                let internal_ganglia_to_process: Vec<_> = internal_ganglia_guard
143                    .iter()
144                    .filter(|(ganglion_id, _ganglion)| {
145                        *ganglion_id != &current_ganglion_id
146                            && !reaction_set_copy.contains(ganglion_id)
147                    })
148                    .map(|(_ganglion_id, ganglion)| ganglion.clone())
149                    .collect();
150                drop(internal_ganglia_guard);
151                tracing::debug!(
152                ganglion_id = %current_ganglion_id,
153                "PlexusInternalReactant::react - Released read lock on internal_ganglia"
154            );
155
156                tracing::debug!(
157                ganglion_id = %current_ganglion_id,
158                "PlexusInternalReactant::react - Acquiring read lock on external_ganglia"
159            );
160                let external_ganglia_guard = external_ganglia.read().await;
161                tracing::debug!(
162                ganglion_id = %current_ganglion_id,
163                "PlexusInternalReactant::react - Acquired read lock on external_ganglia"
164            );
165                let external_ganglia_to_process: Vec<_> = external_ganglia_guard
166                    .iter()
167                    .filter(|(ganglion_id, _ganglion)| !reaction_set_copy.contains(ganglion_id))
168                    .map(|(_ganglion_id, ganglion)| ganglion.clone())
169                    .collect();
170                drop(external_ganglia_guard);
171                tracing::debug!(
172                ganglion_id = %current_ganglion_id,
173                "PlexusInternalReactant::react - Released read lock on external_ganglia"
174            );
175
176                // Now process ganglia without holding collection locks
177                let internal_futures =
178                    internal_ganglia_to_process
179                        .into_iter()
180                        .map(|ganglion_mutex| {
181                            let payload = erased_payload.clone();
182                            let current_id = current_ganglion_id;
183                            Box::pin(async move {
184                    tracing::debug!(ganglion_id = %current_id, "PlexusInternalReactant::react - Acquiring lock on internal ganglion mutex");
185                    let future = {
186                        let mut ganglion = ganglion_mutex.lock().await;
187                        tracing::debug!(ganglion_id = %current_id, "PlexusInternalReactant::react - Acquired lock on internal ganglion mutex");
188                        ganglion.transmit(payload.clone())
189                    };
190                    tracing::debug!("PlexusInternalReactant::react - Released lock on internal ganglion mutex, awaiting future");
191                    future.await
192                }) as UnifiedTransmitFuture
193                        });
194
195                let external_futures =
196                    external_ganglia_to_process
197                        .into_iter()
198                        .map(|ganglion_mutex| {
199                            let payload = erased_payload.clone();
200                            let current_id = current_ganglion_id;
201                            Box::pin(async move {
202                    tracing::debug!(ganglion_id = %current_id, "PlexusInternalReactant::react - Acquiring lock on external ganglion mutex");
203                    let future = {
204                        let mut ganglion = ganglion_mutex.lock().await;
205                        tracing::debug!(ganglion_id = %current_id, "PlexusInternalReactant::react - Acquired lock on external ganglion mutex");
206                        ganglion.transmit(payload.clone())
207                    };
208                    tracing::debug!(ganglion_id = %current_id, "PlexusInternalReactant::react - Released lock on external ganglion mutex, awaiting future");
209                    future.await
210                }) as UnifiedTransmitFuture
211                        });
212
213                let all_futures: Vec<_> = internal_futures.chain(external_futures).collect();
214                tracing::debug!(
215                    "PlexusInternalReactant::react - Joining {} futures",
216                    all_futures.len()
217                );
218                let _results = join_all(all_futures).await;
219                tracing::debug!("PlexusInternalReactant::react - Completed joining futures");
220                Ok(())
221            }
222            .instrument(payload.span_debug("PlexusInternalReactant::react")),
223        )
224    }
225
226    fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
227        self.new_erased()
228    }
229}
230
231/// Safe wrapper for plexus internal reaction from external ganglia
232#[allow(clippy::type_complexity)]
233pub struct PlexusExternalInternalReactant<T, C>
234where
235    C: Codec<T> + CodecName + Send + Sync + 'static,
236    T: Send + Sync + 'static,
237{
238    current_ganglion_id: Uuid,
239    internal_ganglia:
240        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>>,
241    reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
242    _phantom: PhantomData<(T, C)>,
243}
244
245impl<T, C> Clone for PlexusExternalInternalReactant<T, C>
246where
247    C: Codec<T> + CodecName + Send + Sync + 'static,
248    T: Send + Sync + 'static,
249{
250    fn clone(&self) -> Self {
251        Self {
252            current_ganglion_id: self.current_ganglion_id,
253            internal_ganglia: self.internal_ganglia.clone(),
254            reactions: self.reactions.clone(),
255            _phantom: self._phantom,
256        }
257    }
258}
259
260impl<T, C> PlexusExternalInternalReactant<T, C>
261where
262    C: Codec<T> + CodecName + Send + Sync + 'static,
263    T: Send + Sync + 'static,
264{
265    #[allow(clippy::type_complexity)]
266    pub fn new(
267        current_ganglion_id: Uuid,
268        internal_ganglia: Arc<
269            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
270        >,
271        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
272    ) -> Self {
273        Self {
274            current_ganglion_id,
275            internal_ganglia,
276            reactions,
277            _phantom: PhantomData,
278        }
279    }
280}
281
282impl<T, C> Reactant<T, C> for PlexusExternalInternalReactant<T, C>
283where
284    C: Codec<T> + CodecName + Send + Sync + 'static,
285    T: Send + Sync + 'static,
286{
287    fn react(
288        &self,
289        payload: Arc<Payload<T, C>>,
290    ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
291        let current_ganglion_id = self.current_ganglion_id;
292        let internal_ganglia = self.internal_ganglia.clone();
293        let reactions = self.reactions.clone();
294
295        let payload_clone = payload.clone();
296        Box::pin(
297            async move {
298                tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
299
300                // Use Cache to track reactions
301                // Use Cache to track reactions. We key by span_id which remains stable within the Plexus.
302                let reaction_set_arc = reactions
303                    .get_with(payload_clone.span_id(), async {
304                        Arc::new(Mutex::new(HashSet::new()))
305                    })
306                    .await;
307
308                let reaction_set_copy = {
309                    let mut set = reaction_set_arc.lock().await;
310                    if set.contains(&current_ganglion_id) {
311                        tracing::debug!(
312                        "Ganglion {current_ganglion_id} already processed reaction, returning early"
313                    );
314                        return Ok(());
315                    }
316                    set.insert(current_ganglion_id);
317                    set.clone()
318                };
319
320                // Reuse the original payload for transmission
321                let erased_payload = erase_payload(payload_clone.clone());
322
323                // Collect ganglion references first, then release locks to avoid circular dependencies
324                debug!(
325                current_ganglion_id = ?current_ganglion_id,
326                "PlexusExternalInternalReactant::react - Acquiring read lock on internal_ganglia"
327            );
328                let internal_ganglia_guard = internal_ganglia.read().await;
329                debug!(
330                current_ganglion_id = ?current_ganglion_id,
331                "PlexusExternalInternalReactant::react - Acquired read lock on internal_ganglia"
332            );
333                let internal_ganglia_to_process: Vec<_> = internal_ganglia_guard
334                    .iter()
335                    .filter(|(ganglion_id, _ganglion)| {
336                        !reaction_set_copy.contains(ganglion_id)
337                    })
338                    .map(|(_ganglion_id, ganglion)| ganglion.clone())
339                    .collect();
340                drop(internal_ganglia_guard);
341                debug!(
342                current_ganglion_id = ?current_ganglion_id,
343                "PlexusExternalInternalReactant::react - Released read lock on internal_ganglia"
344            );
345
346                let all_futures_len = internal_ganglia_to_process.len();
347                // Now process ganglia without holding collection locks
348                let internal_futures = internal_ganglia_to_process.into_iter().map(
349                    |ganglion_mutex| {
350                        let payload = erased_payload.clone();
351                        let current_id = current_ganglion_id;
352                        Box::pin(async move {
353                    debug!(current_ganglion_id = ?current_id, "PlexusExternalInternalReactant::react - Acquiring lock on internal ganglion mutex");
354                    let future = {
355                        let mut ganglion = ganglion_mutex.lock().await;
356                        debug!(current_ganglion_id = ?current_id, "PlexusExternalInternalReactant::react - Acquired lock on internal ganglion mutex");
357                        ganglion.transmit(payload.clone())
358                    };
359                    debug!(current_ganglion_id = ?current_id, "PlexusExternalInternalReactant::react - Released lock on internal ganglion mutex, awaiting future");
360                    future.await
361                })
362                    },
363                );
364
365                tracing::debug!(
366                    "PlexusExternalInternalReactant::react - Joining {} futures",
367                    all_futures_len
368                );
369                let _results = join_all(internal_futures).await;
370                tracing::debug!(
371                    "PlexusExternalInternalReactant::react - Completed joining futures"
372                );
373                Ok(())
374            }
375            .instrument(payload.span_debug("PlexusExternalInternalReactant::react")),
376        )
377    }
378
379    fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
380        self.new_erased()
381    }
382}
383
384/// Safe wrapper for plexus external reaction from external ganglia
385#[allow(clippy::type_complexity)]
386pub struct PlexusExternalExternalReactant<T, C>
387where
388    C: Codec<T> + CodecName + Send + Sync + 'static,
389    T: Send + Sync + 'static,
390{
391    current_ganglion_id: Uuid,
392    external_ganglia:
393        Arc<RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>>,
394    reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
395    _phantom: PhantomData<(T, C)>,
396}
397
398impl<T, C> Clone for PlexusExternalExternalReactant<T, C>
399where
400    C: Codec<T> + CodecName + Send + Sync + 'static,
401    T: Send + Sync + 'static,
402{
403    fn clone(&self) -> Self {
404        Self {
405            current_ganglion_id: self.current_ganglion_id,
406            external_ganglia: self.external_ganglia.clone(),
407            reactions: self.reactions.clone(),
408            _phantom: self._phantom,
409        }
410    }
411}
412
413impl<T, C> PlexusExternalExternalReactant<T, C>
414where
415    C: Codec<T> + CodecName + Send + Sync + 'static,
416    T: Send + Sync + 'static,
417{
418    #[allow(clippy::type_complexity)]
419    pub fn new(
420        current_ganglion_id: Uuid,
421        external_ganglia: Arc<
422            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
423        >,
424        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
425    ) -> Self {
426        Self {
427            current_ganglion_id,
428            external_ganglia,
429            reactions,
430            _phantom: PhantomData,
431        }
432    }
433}
434
435impl<T, C> ReactantRaw<T, C> for PlexusExternalExternalReactant<T, C>
436where
437    C: Codec<T> + CodecName + Send + Sync + 'static,
438    T: Send + Sync + 'static,
439{
440    fn react(
441        &self,
442        payload: Arc<PayloadRaw<T, C>>,
443    ) -> Pin<Box<dyn Future<Output = Result<(), crate::reactant::ReactantError>> + Send + 'static>> {
444        let current_ganglion_id = self.current_ganglion_id;
445        let external_ganglia = self.external_ganglia.clone();
446        let reactions = self.reactions.clone();
447
448        let payload_clone = payload.clone();
449        Box::pin(
450            async move {
451                tracing::debug!("Starting reaction for ganglion {current_ganglion_id}");
452
453                // Use Cache to track reactions
454                // Use Cache to track reactions. We key by span_id which remains stable within the Plexus.
455                let reaction_set_arc = reactions
456                    .get_with(payload_clone.span_id(), async {
457                        Arc::new(Mutex::new(HashSet::new()))
458                    })
459                    .await;
460
461                let reaction_set_copy = {
462                    let mut set = reaction_set_arc.lock().await;
463                    if set.contains(&current_ganglion_id) {
464                        tracing::debug!(
465                        "PlexusExternalExternalReactant::react - Ganglion {current_ganglion_id} already processed reaction, returning early"
466                    );
467                        return Ok(());
468                    }
469                    set.insert(current_ganglion_id);
470                    set.clone()
471                };
472
473                // Reuse the original payload for transmission
474                let erased_payload = erase_payload_raw(payload_clone.clone());
475
476                // Collect ganglion references first, then release locks to avoid circular dependencies
477                debug!(
478                current_ganglion_id = ?current_ganglion_id,
479                "PlexusExternalExternalReactant::react - Acquiring read lock on external_ganglia"
480            );
481                let external_ganglia_guard = external_ganglia.read().await;
482                debug!(
483                current_ganglion_id = ?current_ganglion_id,
484                "PlexusExternalExternalReactant::react - Acquired read lock on external_ganglia"
485            );
486                let external_ganglia_to_process: Vec<_> = external_ganglia_guard
487                    .iter()
488                    .filter(|(ganglion_id, _ganglion)| {
489                        *ganglion_id != &current_ganglion_id
490                            && !reaction_set_copy.contains(ganglion_id)
491                    })
492                    .map(|(_ganglion_id, ganglion)| ganglion.clone())
493                    .collect();
494                drop(external_ganglia_guard);
495                debug!(
496                current_ganglion_id = ?current_ganglion_id,
497                "PlexusExternalExternalReactant::react - Released read lock on external_ganglia"
498            );
499
500                // Now process ganglia without holding collection locks
501                let external_futures_len = external_ganglia_to_process.len();
502                let external_futures =
503                    external_ganglia_to_process
504                        .into_iter()
505                        .map(|ganglion_mutex| {
506                            let payload = erased_payload.clone();
507                            let current_id = current_ganglion_id;
508                            Box::pin(async move {
509                    debug!(current_ganglion_id = ?current_id, "PlexusExternalExternalReactant::react - Acquiring lock on external ganglion mutex");
510                    let future = {
511                        let mut ganglion = ganglion_mutex.lock().await;
512                        debug!(current_ganglion_id = ?current_id, "PlexusExternalExternalReactant::react - Acquired lock on external ganglion mutex");
513                        ganglion.transmit_encoded(payload.clone())
514                    };
515                    debug!(current_ganglion_id = ?current_id, "PlexusExternalExternalReactant::react - Released lock on external ganglion mutex, awaiting future");
516                    future.await
517                })
518                        });
519
520                tracing::debug!(
521                    "PlexusExternalExternalReactant::react - Joining {} futures",
522                    external_futures_len
523                );
524                let _results = join_all(external_futures).await;
525                tracing::debug!(
526                    "PlexusExternalExternalReactant::react - Completed joining futures"
527                );
528                Ok(())
529            }
530            .instrument(payload.span_debug("PlexusExternalExternalReactant::react")),
531        )
532    }
533
534    fn erase_raw(self: Box<Self>) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
535        self.new_erased_raw()
536    }
537}
538
539pub trait ErasedInternalReactantFactory: Send + Sync + 'static {
540    #[allow(clippy::type_complexity)]
541    fn create_reactant(
542        &self,
543        current_ganglion_id: Uuid,
544        internal_ganglia: Arc<
545            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
546        >,
547        external_ganglia: Arc<
548            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
549        >,
550        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
551    ) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
552}
553
554pub trait ErasedExternalInternalReactantFactory: Send + Sync + 'static {
555    #[allow(clippy::type_complexity)]
556    fn create_reactant(
557        &self,
558        current_ganglion_id: Uuid,
559        internal_ganglia: Arc<
560            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
561        >,
562        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
563    ) -> Arc<dyn ReactantErased + Send + Sync + 'static>;
564}
565
566pub trait ErasedExternalExternalReactantFactory: Send + Sync + 'static {
567    #[allow(clippy::type_complexity)]
568    fn create_reactant(
569        &self,
570        current_ganglion_id: Uuid,
571        external_ganglia: Arc<
572            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
573        >,
574        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
575    ) -> Arc<dyn ReactantRawErased + Send + Sync + 'static>;
576}
577
578pub struct PlexusInternalReactantFactory<T, C> {
579    _phantom: PhantomData<(T, C)>,
580}
581
582impl<T, C> PlexusInternalReactantFactory<T, C> {
583    pub fn new() -> Self {
584        Self {
585            _phantom: PhantomData,
586        }
587    }
588}
589
590impl<T, C> Default for PlexusInternalReactantFactory<T, C> {
591    fn default() -> Self {
592        Self::new()
593    }
594}
595
596impl<T, C> ErasedInternalReactantFactory for PlexusInternalReactantFactory<T, C>
597where
598    C: Codec<T> + CodecName + Send + Sync + 'static,
599    T: Send + Sync + 'static,
600{
601    fn create_reactant(
602        &self,
603        current_ganglion_id: Uuid,
604        internal_ganglia: Arc<
605            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
606        >,
607        external_ganglia: Arc<
608            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
609        >,
610        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
611    ) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
612        erase_reactant(Box::new(PlexusInternalReactant::<T, C>::new(
613            current_ganglion_id,
614            internal_ganglia,
615            external_ganglia,
616            reactions,
617        )))
618    }
619}
620
621pub struct PlexusExternalInternalReactantFactory<T, C> {
622    _phantom: PhantomData<(T, C)>,
623}
624
625impl<T, C> PlexusExternalInternalReactantFactory<T, C> {
626    pub fn new() -> Self {
627        Self {
628            _phantom: PhantomData,
629        }
630    }
631}
632
633impl<T, C> Default for PlexusExternalInternalReactantFactory<T, C> {
634    fn default() -> Self {
635        Self::new()
636    }
637}
638
639impl<T, C> ErasedExternalInternalReactantFactory for PlexusExternalInternalReactantFactory<T, C>
640where
641    C: Codec<T> + CodecName + Send + Sync + 'static,
642    T: Send + Sync + 'static,
643{
644    fn create_reactant(
645        &self,
646        current_ganglion_id: Uuid,
647        internal_ganglia: Arc<
648            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>>>,
649        >,
650        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
651    ) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
652        erase_reactant(Box::new(PlexusExternalInternalReactant::<T, C>::new(
653            current_ganglion_id,
654            internal_ganglia,
655            reactions,
656        )))
657    }
658}
659
660pub struct PlexusExternalExternalReactantFactory<T, C> {
661    _phantom: PhantomData<(T, C)>,
662}
663
664impl<T, C> PlexusExternalExternalReactantFactory<T, C> {
665    pub fn new() -> Self {
666        Self {
667            _phantom: PhantomData,
668        }
669    }
670}
671
672impl<T, C> Default for PlexusExternalExternalReactantFactory<T, C> {
673    fn default() -> Self {
674        Self::new()
675    }
676}
677
678impl<T, C> ErasedExternalExternalReactantFactory for PlexusExternalExternalReactantFactory<T, C>
679where
680    C: Codec<T> + CodecName + Send + Sync + 'static,
681    T: Send + Sync + 'static,
682{
683    fn create_reactant(
684        &self,
685        current_ganglion_id: Uuid,
686        external_ganglia: Arc<
687            RwLock<HashMap<Uuid, Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>>,
688        >,
689        reactions: Cache<u64, Arc<Mutex<HashSet<Uuid>>>>,
690    ) -> Arc<dyn ReactantRawErased + Send + Sync + 'static> {
691        erase_reactant_raw(Box::new(PlexusExternalExternalReactant::<T, C>::new(
692            current_ganglion_id,
693            external_ganglia,
694            reactions,
695        )))
696    }
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702    use crate::test_utils::{DebugCodec, DebugStruct};
703    use std::time::Duration;
704    use uuid::Uuid;
705
706    #[tokio::test]
707    async fn test_plexus_internal_reactant_factory() {
708        let factory = PlexusInternalReactantFactory::<DebugStruct, DebugCodec>::new();
709
710        let ganglion_id = Uuid::now_v7();
711        let internal_ganglia = Arc::new(RwLock::new(HashMap::<
712            Uuid,
713            Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
714        >::new()));
715        let external_ganglia = Arc::new(RwLock::new(HashMap::<
716            Uuid,
717            Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>::new()));
718        let reactions = Cache::builder()
719            .time_to_live(Duration::from_secs(60))
720            .build();
721
722        let reactant =
723            factory.create_reactant(ganglion_id, internal_ganglia, external_ganglia, reactions);
724
725        // Verify that the factory created a reactant
726        assert!(Arc::strong_count(&reactant) >= 1);
727    }
728
729    #[tokio::test]
730    async fn test_plexus_external_internal_reactant_factory() {
731        let factory = PlexusExternalInternalReactantFactory::<DebugStruct, DebugCodec>::new();
732
733        let ganglion_id = Uuid::now_v7();
734        let internal_ganglia = Arc::new(RwLock::new(HashMap::<
735            Uuid,
736            Arc<Mutex<dyn GanglionInternal + Send + Sync + 'static>>,
737        >::new()));
738        let reactions = Cache::builder()
739            .time_to_live(Duration::from_secs(60))
740            .build();
741
742        let reactant = factory.create_reactant(ganglion_id, internal_ganglia, reactions);
743
744        // Verify that the factory created a reactant
745        assert!(Arc::strong_count(&reactant) >= 1);
746    }
747
748    #[tokio::test]
749    async fn test_plexus_external_external_reactant_factory() {
750        let factory = PlexusExternalExternalReactantFactory::<DebugStruct, DebugCodec>::new();
751
752        let ganglion_id = Uuid::now_v7();
753        let external_ganglia = Arc::new(RwLock::new(HashMap::<
754            Uuid,
755            Arc<Mutex<dyn GanglionExternal + Send + Sync + 'static>>>::new()));
756        let reactions = Cache::builder()
757            .time_to_live(Duration::from_secs(60))
758            .build();
759
760        let reactant = factory.create_reactant(ganglion_id, external_ganglia, reactions);
761
762        // Verify that the factory created a reactant
763        assert!(Arc::strong_count(&reactant) >= 1);
764    }
765}